はじめに
前回、C#で2つのテーブル間の差分をチェックするバッチ処理を作成したが、今度はPL/pgSQLのストアドプロシージャで同じ処理を実装してみる。
環境
PostgreSQL 17GitHub
今回作成したプロシージャのソースコードは以下のリポジトリで公開している。
https://github.com/katsuobushiFPGA/postgresql-procedure-dev
- 作成したテーブルとプロシージャ
プロシージャの動作について
このストアドプロシージャを実行すると、以下のような動作をする。
- トランザクションテーブル(
t_data)と最新データテーブル(t_datalatest)を比較 - 差分がないレコードのみをトランザクションテーブルから削除
- 差分があるレコードは最新データテーブルを更新(UPSERT)
- 新規レコードは最新データテーブルに挿入
- 処理結果をログ出力
例えば、以下のような流れになる。
-- 実行前
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;
※件数確認
-- プロシージャ実行
CALL sync_t_data_to_t_datalatest_upsert_correct();
NOTICE: 差分チェック処理を開始します...
NOTICE: 処理件数: 3件
NOTICE: UPSERT総処理件数: 2件
-- 実行後
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;
※件数確認処理フローとシーケンスについて
処理フロー図
flowchart TD
A[プロシージャ開始] --> B[差分チェック処理開始ログ出力]
B --> C[t_dataのレコード数取得
total_count]
C --> D[DELETE処理:
差分がないレコードをt_dataから削除]
D --> E[削除件数を取得
deleted_count]
E --> F[UPSERT処理:
t_dataの全レコードをt_datalatest に処理]
F --> G[UPSERT件数を取得
upsert_count]
G --> H[処理結果サマリーをログ出力]
H --> I[プロシージャ正常終了]
D --> J{エラー発生?}
F --> J
J -->|Yes| K[エラーメッセージ出力
EXCEPTION]
K --> L[プロシージャ異常終了]
J -->|No| H
style A fill:#e1f5fe
style I fill:#c8e6c9
style L fill:#ffcdd2
style D fill:#fff3e0
style F fill:#f3e5f5
シーケンス図
sequenceDiagram
participant Client as クライアント
participant Proc as sync_procedure
participant TableA as t_data
participant TableB as t_datalatest
participant Log as ログ出力
Client->>Proc: CALL sync_t_data_to_t_datalatest_upsert_correct()
Proc->>Log: NOTICE '差分チェック処理を開始します...'
Note over Proc,TableA: ステップ1: 処理対象件数の取得
Proc->>TableA: SELECT COUNT(*) FROM t_data
TableA-->>Proc: total_count
Note over Proc,TableB: ステップ2: 差分なしレコードの削除
Proc->>TableA: DELETE FROM t_data USING t_datalatest
WHERE 主キー一致 AND 全カラム一致
TableA-->>Proc: deleted_count
Note over Proc,TableB: ステップ3: UPSERT処理
Proc->>TableB: INSERT INTO t_datalatest
SELECT * FROM t_data
ON CONFLICT DO UPDATE
WHERE 差分あり
TableB-->>Proc: upsert_count
Note over Proc,Log: ステップ4: 結果出力
Proc->>Log: NOTICE '処理件数: total_count件'
Proc->>Log: NOTICE 'UPSERT総処理件数: upsert_count件'
Proc->>Log: NOTICE 'Aから削除(差分なし): deleted_count件'
Proc-->>Client: 正常終了
Note over Proc: エラー時
alt エラー発生
Proc->>Log: EXCEPTION 'UPSERT版同期処理中にエラーが発生しました'
Proc-->>Client: 異常終了
end
テーブル構造
前提条件
-
PostgreSQLデータベースが準備されていること
テーブル作成SQL
-- テーブルA(トランザクションテーブル)
-- 随時データが入れ替わるテーブル
DROP TABLE IF EXISTS t_data CASCADE;
CREATE TABLE IF NOT EXISTS t_data (
pkey_1 VARCHAR(50) NOT NULL,
pkey_2 VARCHAR(50) NOT NULL,
pkey_3 VARCHAR(50) NOT NULL,
pkey_4 VARCHAR(50) NOT NULL,
column_1 TEXT,
column_2 TEXT,
column_3 TEXT,
column_4 TEXT,
column_5 TEXT,
column_6 TEXT,
column_7 TEXT,
column_8 TEXT,
column_9 TEXT,
column_10 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT pk_t_data PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);
-- テーブルB(最新状態保持テーブル)
-- 最新の状態を保持するテーブル
DROP TABLE IF EXISTS t_datalatest CASCADE;
CREATE TABLE IF NOT EXISTS t_datalatest (
pkey_1 VARCHAR(50) NOT NULL,
pkey_2 VARCHAR(50) NOT NULL,
pkey_3 VARCHAR(50) NOT NULL,
pkey_4 VARCHAR(50) NOT NULL,
column_1 TEXT,
column_2 TEXT,
column_3 TEXT,
column_4 TEXT,
column_5 TEXT,
column_6 TEXT,
column_7 TEXT,
column_8 TEXT,
column_9 TEXT,
column_10 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT pk_t_datalatest PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);PL/pgSQLプロシージャの実装
差分検出・同期プロシージャ
-- テーブルAとテーブルBの差分更新プロシージャ
-- 要件:
-- ・AとBを比較して差分がないレコードのみをAから削除
-- ・差分があるレコードはBを更新(Aからは削除しない)
-- ・BにないレコードはBに挿入(Aからは削除しない)
CREATE OR REPLACE PROCEDURE sync_t_data_to_t_datalatest_upsert_correct()
LANGUAGE plpgsql
AS
$$
DECLARE
total_count INTEGER;
deleted_count INTEGER := 0;
upsert_count INTEGER := 0;
BEGIN
RAISE NOTICE '差分チェック処理を開始します...';
-- テーブルAのレコード数を取得
SELECT COUNT(*) INTO total_count FROM t_data;
-- 差分がない場合のみテーブルAからの削除
DELETE FROM t_data
USING t_datalatest
WHERE t_data.pkey_1 = t_datalatest.pkey_1
AND t_data.pkey_2 = t_datalatest.pkey_2
AND t_data.pkey_3 = t_datalatest.pkey_3
AND t_data.pkey_4 = t_datalatest.pkey_4
AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
AND t_data.column_2 IS NOT DISTINCT FROM t_datalatest.column_2
AND t_data.column_3 IS NOT DISTINCT FROM t_datalatest.column_3
AND t_data.column_4 IS NOT DISTINCT FROM t_datalatest.column_4
AND t_data.column_5 IS NOT DISTINCT FROM t_datalatest.column_5
AND t_data.column_6 IS NOT DISTINCT FROM t_datalatest.column_6
AND t_data.column_7 IS NOT DISTINCT FROM t_datalatest.column_7
AND t_data.column_8 IS NOT DISTINCT FROM t_datalatest.column_8
AND t_data.column_9 IS NOT DISTINCT FROM t_datalatest.column_9
AND t_data.column_10 IS NOT DISTINCT FROM t_datalatest.column_10;
GET DIAGNOSTICS deleted_count = ROW_COUNT;
-- UPSERT (INSERT ... ON CONFLICT DO UPDATE) で一括処理
-- 差分がある場合のみ更新、新規の場合は挿入
INSERT INTO t_datalatest (
pkey_1, pkey_2, pkey_3, pkey_4,
column_1, column_2, column_3, column_4, column_5,
column_6, column_7, column_8, column_9, column_10
)
SELECT
a.pkey_1, a.pkey_2, a.pkey_3, a.pkey_4,
a.column_1, a.column_2, a.column_3, a.column_4, a.column_5,
a.column_6, a.column_7, a.column_8, a.column_9, a.column_10
FROM t_data a
ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4)
DO UPDATE SET
column_1 = EXCLUDED.column_1,
column_2 = EXCLUDED.column_2,
column_3 = EXCLUDED.column_3,
column_4 = EXCLUDED.column_4,
column_5 = EXCLUDED.column_5,
column_6 = EXCLUDED.column_6,
column_7 = EXCLUDED.column_7,
column_8 = EXCLUDED.column_8,
column_9 = EXCLUDED.column_9,
column_10 = EXCLUDED.column_10,
updated_at = CURRENT_TIMESTAMP
WHERE (
t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
t_datalatest.column_3 IS DISTINCT FROM EXCLUDED.column_3 OR
t_datalatest.column_4 IS DISTINCT FROM EXCLUDED.column_4 OR
t_datalatest.column_5 IS DISTINCT FROM EXCLUDED.column_5 OR
t_datalatest.column_6 IS DISTINCT FROM EXCLUDED.column_6 OR
t_datalatest.column_7 IS DISTINCT FROM EXCLUDED.column_7 OR
t_datalatest.column_8 IS DISTINCT FROM EXCLUDED.column_8 OR
t_datalatest.column_9 IS DISTINCT FROM EXCLUDED.column_9 OR
t_datalatest.column_10 IS DISTINCT FROM EXCLUDED.column_10
);
GET DIAGNOSTICS upsert_count = ROW_COUNT;
-- 処理結果のサマリーを出力
RAISE NOTICE '処理件数: %件', total_count;
RAISE NOTICE 'UPSERT総処理件数: %件', upsert_count;
RAISE NOTICE 'Aから削除(差分なし): %件', deleted_count;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'UPSERT版同期処理中にエラーが発生しました: %', SQLERRM;
END;
$$;プロシージャの特徴
処理について
- 差分なしレコードの削除 トランザクションテーブルから完全一致レコードを削除
- UPSERT処理 残ったレコードを最新テーブルに挿入/更新
- 処理結果の出力 各段階の処理件数をログ出力
重要なSQL構文
| 構文 | 役割 |
|---|---|
IS NOT DISTINCT FROM | NULL値の比較が可能な構文 |
INSERT ... ON CONFLICT | UPSERT構文 |
GET DIAGNOSTICS | 直前のSQL文で処理されたレコード数を取得 |
RAISE NOTICE | 処理状況をクライアントに通知 |
実行SQLの詳細解説
1. 差分なしレコードの削除SQL
DELETE FROM t_data
USING t_datalatest
WHERE t_data.pkey_1 = t_datalatest.pkey_1
AND t_data.pkey_2 = t_datalatest.pkey_2
-- 主キー4つすべて一致
AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
-- データカラム10個すべて一致(NULL値も考慮)DELETE ... USING構文で2つのテーブルを結合して条件に合うレコードを削除IS NOT DISTINCT FROMにより、NULLとNULLも「一致」として扱う- 主キー4つ + データカラム10個 = 計14項目すべてが一致する場合のみ削除
2. UPSERT処理のSQL
INSERT INTO t_datalatest (...)
SELECT ... FROM t_data a
ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4)
DO UPDATE SET column_1 = EXCLUDED.column_1, ...
WHERE (
t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
-- データ差分がある場合のみ更新実行
);INSERTで新規データの挿入を試行- 主キー重複時は
ON CONFLICTで更新処理に移行 EXCLUDEDは挿入しようとしたデータを参照WHERE句で各カラムを比較し、実際に差分がある場合のみ更新を実行
差分検出ロジック
| 処理 | 条件 | 動作 |
|---|---|---|
| 削除 | 主キー+データ用のカラムが完全一致 | トランザクション(t_data)から該当レコードを削除 |
| 更新 | 主キー一致 + データ差分あり | 最新テーブル(t_datalatest)を更新 |
| 挿入 | 主キーが最新テーブルに存在しない | 最新テーブル(t_datalatest)に新規挿入 |
テストと実行結果
サンプルデータの準備
-- サンプルデータの挿入(テスト用)
INSERT INTO t_data (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES
('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
('A002', 'B002', 'C002', 'D002', 'データ2-1', 'データ2-2', 'データ2-3', 'データ2-4', 'データ2-5', 'データ2-6', 'データ2-7', 'データ2-8', 'データ2-9', 'データ2-10'),
('A003', 'B003', 'C003', 'D003', 'データ3-1', 'データ3-2', 'データ3-3', 'データ3-4', 'データ3-5', 'データ3-6', 'データ3-7', 'データ3-8', 'データ3-9', 'データ3-10')
ON CONFLICT DO NOTHING;
INSERT INTO t_datalatest (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES
('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
('A002', 'B002', 'C002', 'D002', '古いデータ2-1', '古いデータ2-2', '古いデータ2-3', '古いデータ2-4', '古いデータ2-5', '古いデータ2-6', '古いデータ2-7', '古いデータ2-8', '古いデータ2-9', '古いデータ2-10')
ON CONFLICT DO NOTHING;実行例
実行前のテーブル状態:
-- 実行前確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;| label | count |
|---|---|
| トランザクションテーブル件数 | 3 |
| 最新データテーブル件数 | 2 |
プロシージャ実行:
CALL sync_t_data_to_t_datalatest_upsert_correct();実行ログ:
NOTICE: 差分チェック処理を開始します...
NOTICE: 処理件数: 3件
NOTICE: UPSERT総処理件数: 2件
NOTICE: Aから削除(差分なし): 1件
CALL実行後のテーブル状態:
-- 実行後確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;| label | count |
|---|---|
| トランザクションテーブル件数 | 2 |
| 最新データテーブル件数 | 3 |
処理結果の詳細
- A001 - 完全一致のため、
t_dataから削除 - A002 - データ差分ありのため、
t_datalatestを更新 - A003 - 新規データのため、
t_datalatestに挿入
参考
PostgreSQLDocumentation - PL/pgSQL
https://www.postgresql.org/docs/current/plpgsql.htmlPostgreSQLDocumentation - INSERT ON CONFLICT
https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICTPostgreSQLDocumentation - IS DISTINCT FROM
https://www.postgresql.org/docs/current/functions-comparison.htmlGitHub Repository- postgresql-procedure-dev
https://github.com/katsuobushiFPGA/postgresql-procedure-devSQLでNULLとの比較結果をTRUE/FALSEにしたいときはIS DISTINCT FROM演算子を使う | Qiita
https://qiita.com/gooddoog/items/ad8c20b1734d5811bf78
おわりに
今回は、PL/pgSQLでテーブル間の差分検出・同期処理を実装してみた。
C#の場合はエンティティなどを作成する必要があるが、プロシージャの場合はSQLを組み合わせれば実装できるため比較的楽に感じる。
ただし、プロシージャのデバッグが大変なので、デバッグや単体テストについて良い方法がないかを今後調べてみる予定である。