PL/pgSQLで2つのテーブル間の差分を検出してみる

はじめに

前回、C#で2つのテーブル間の差分をチェックするバッチ処理を作成したが、今度はPL/pgSQLのストアドプロシージャで同じ処理を実装してみる。

環境

PostgreSQL 17

GitHub

今回作成したプロシージャのソースコードは以下のリポジトリで公開している。

https://github.com/katsuobushiFPGA/postgresql-procedure-dev

プロシージャの動作について

このストアドプロシージャを実行すると、以下のような動作をする。

  1. トランザクションテーブルt_data)と最新データテーブルt_datalatest)を比較
  2. 差分がないレコードのみをトランザクションテーブルから削除
  3. 差分があるレコードは最新データテーブルを更新(UPSERT)
  4. 新規レコードは最新データテーブルに挿入
  5. 処理結果をログ出力

例えば、以下のような流れになる。

-- 実行前
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;
※件数確認

-- プロシージャ実行
CALL sync_t_data_to_t_datalatest_upsert_correct();

NOTICE:  差分チェック処理を開始します...
NOTICE:  処理件数: 3
NOTICE:  UPSERT総処理件数: 2

-- 実行後
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;

※件数確認

処理フローとシーケンスについて

処理フロー図

Yes

No

プロシージャ開始

差分チェック処理開始ログ出力

t_dataのレコード数取得
total_count

DELETE処理:
差分がないレコードをt_dataから削除

削除件数を取得
deleted_count

UPSERT処理:
t_dataの全レコードをt_datalatest に処理

UPSERT件数を取得
upsert_count

処理結果サマリーをログ出力

プロシージャ正常終了

エラー発生?

エラーメッセージ出力
EXCEPTION

プロシージャ異常終了

シーケンス図

ログ出力t_datalatestt_datasync_procedureクライアントログ出力t_datalatestt_datasync_procedureクライアントステップ1: 処理対象件数の取得ステップ2: 差分なしレコードの削除ステップ3: UPSERT処理ステップ4: 結果出力エラー時alt[エラー発生]CALL sync_t_data_to_t_datalatest_upsert_correct()NOTICE '差分チェック処理を開始します...'SELECT COUNT(*) FROM t_datatotal_countDELETE FROM t_data USING t_datalatestWHERE 主キー一致 AND 全カラム一致deleted_countINSERT INTO t_datalatestSELECT * FROM t_dataON CONFLICT DO UPDATEWHERE 差分ありupsert_countNOTICE '処理件数: total_count件'NOTICE 'UPSERT総処理件数: upsert_count件'NOTICE 'Aから削除(差分なし): deleted_count件'正常終了EXCEPTION 'UPSERT版同期処理中にエラーが発生しました'異常終了

テーブル構造

前提条件

  • PostgreSQLデータベースが準備されていること
テーブル作成SQL
-- テーブルA(トランザクションテーブル)
-- 随時データが入れ替わるテーブル
DROP TABLE IF EXISTS t_data CASCADE;
CREATE TABLE IF NOT EXISTS t_data (
    pkey_1 VARCHAR(50) NOT NULL,
    pkey_2 VARCHAR(50) NOT NULL,
    pkey_3 VARCHAR(50) NOT NULL,
    pkey_4 VARCHAR(50) NOT NULL,
    column_1 TEXT,
    column_2 TEXT,
    column_3 TEXT,
    column_4 TEXT,
    column_5 TEXT,
    column_6 TEXT,
    column_7 TEXT,
    column_8 TEXT,
    column_9 TEXT,
    column_10 TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT pk_t_data PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);

-- テーブルB(最新状態保持テーブル)
-- 最新の状態を保持するテーブル
DROP TABLE IF EXISTS t_datalatest CASCADE;
CREATE TABLE IF NOT EXISTS t_datalatest (
    pkey_1 VARCHAR(50) NOT NULL,
    pkey_2 VARCHAR(50) NOT NULL,
    pkey_3 VARCHAR(50) NOT NULL,
    pkey_4 VARCHAR(50) NOT NULL,
    column_1 TEXT,
    column_2 TEXT,
    column_3 TEXT,
    column_4 TEXT,
    column_5 TEXT,
    column_6 TEXT,
    column_7 TEXT,
    column_8 TEXT,
    column_9 TEXT,
    column_10 TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT pk_t_datalatest PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);

PL/pgSQLプロシージャの実装

差分検出・同期プロシージャ

-- テーブルAとテーブルBの差分更新プロシージャ
-- 要件:
-- ・AとBを比較して差分がないレコードのみをAから削除
-- ・差分があるレコードはBを更新(Aからは削除しない)
-- ・BにないレコードはBに挿入(Aからは削除しない)

CREATE OR REPLACE PROCEDURE sync_t_data_to_t_datalatest_upsert_correct()
LANGUAGE plpgsql
AS
$$
DECLARE
    total_count INTEGER;
    deleted_count INTEGER := 0;
    upsert_count INTEGER := 0;
BEGIN
    RAISE NOTICE '差分チェック処理を開始します...';
    
    -- テーブルAのレコード数を取得
    SELECT COUNT(*) INTO total_count FROM t_data;

    -- 差分がない場合のみテーブルAからの削除
    DELETE FROM t_data 
    USING t_datalatest 
    WHERE t_data.pkey_1 = t_datalatest.pkey_1 
      AND t_data.pkey_2 = t_datalatest.pkey_2 
      AND t_data.pkey_3 = t_datalatest.pkey_3 
      AND t_data.pkey_4 = t_datalatest.pkey_4
      AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
      AND t_data.column_2 IS NOT DISTINCT FROM t_datalatest.column_2
      AND t_data.column_3 IS NOT DISTINCT FROM t_datalatest.column_3
      AND t_data.column_4 IS NOT DISTINCT FROM t_datalatest.column_4
      AND t_data.column_5 IS NOT DISTINCT FROM t_datalatest.column_5
      AND t_data.column_6 IS NOT DISTINCT FROM t_datalatest.column_6
      AND t_data.column_7 IS NOT DISTINCT FROM t_datalatest.column_7
      AND t_data.column_8 IS NOT DISTINCT FROM t_datalatest.column_8
      AND t_data.column_9 IS NOT DISTINCT FROM t_datalatest.column_9
      AND t_data.column_10 IS NOT DISTINCT FROM t_datalatest.column_10;
      
    GET DIAGNOSTICS deleted_count = ROW_COUNT;

    -- UPSERT (INSERT ... ON CONFLICT DO UPDATE) で一括処理
    -- 差分がある場合のみ更新、新規の場合は挿入
    INSERT INTO t_datalatest (
        pkey_1, pkey_2, pkey_3, pkey_4,
        column_1, column_2, column_3, column_4, column_5,
        column_6, column_7, column_8, column_9, column_10
    )
    SELECT 
        a.pkey_1, a.pkey_2, a.pkey_3, a.pkey_4,
        a.column_1, a.column_2, a.column_3, a.column_4, a.column_5,
        a.column_6, a.column_7, a.column_8, a.column_9, a.column_10
    FROM t_data a
    ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4) 
    DO UPDATE SET
        column_1 = EXCLUDED.column_1,
        column_2 = EXCLUDED.column_2,
        column_3 = EXCLUDED.column_3,
        column_4 = EXCLUDED.column_4,
        column_5 = EXCLUDED.column_5,
        column_6 = EXCLUDED.column_6,
        column_7 = EXCLUDED.column_7,
        column_8 = EXCLUDED.column_8,
        column_9 = EXCLUDED.column_9,
        column_10 = EXCLUDED.column_10,
        updated_at = CURRENT_TIMESTAMP
    WHERE (
        t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
        t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
        t_datalatest.column_3 IS DISTINCT FROM EXCLUDED.column_3 OR
        t_datalatest.column_4 IS DISTINCT FROM EXCLUDED.column_4 OR
        t_datalatest.column_5 IS DISTINCT FROM EXCLUDED.column_5 OR
        t_datalatest.column_6 IS DISTINCT FROM EXCLUDED.column_6 OR
        t_datalatest.column_7 IS DISTINCT FROM EXCLUDED.column_7 OR
        t_datalatest.column_8 IS DISTINCT FROM EXCLUDED.column_8 OR
        t_datalatest.column_9 IS DISTINCT FROM EXCLUDED.column_9 OR
        t_datalatest.column_10 IS DISTINCT FROM EXCLUDED.column_10
    );
    
    GET DIAGNOSTICS upsert_count = ROW_COUNT;

    -- 処理結果のサマリーを出力
    RAISE NOTICE '処理件数: %件', total_count;
    RAISE NOTICE 'UPSERT総処理件数: %件', upsert_count;
    RAISE NOTICE 'Aから削除(差分なし): %件', deleted_count;
    
EXCEPTION
    WHEN OTHERS THEN
        RAISE EXCEPTION 'UPSERT版同期処理中にエラーが発生しました: %', SQLERRM;
END;
$$;

プロシージャの特徴

処理について

  1. 差分なしレコードの削除 トランザクションテーブルから完全一致レコードを削除
  2. UPSERT処理 残ったレコードを最新テーブルに挿入/更新
  3. 処理結果の出力 各段階の処理件数をログ出力

重要なSQL構文

構文役割
IS NOT DISTINCT FROMNULL値の比較が可能な構文
INSERT ... ON CONFLICTUPSERT構文
GET DIAGNOSTICS直前のSQL文で処理されたレコード数を取得
RAISE NOTICE処理状況をクライアントに通知

実行SQLの詳細解説

1. 差分なしレコードの削除SQL
DELETE FROM t_data 
USING t_datalatest 
WHERE t_data.pkey_1 = t_datalatest.pkey_1 
  AND t_data.pkey_2 = t_datalatest.pkey_2 
  -- 主キー4つすべて一致
  AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
  -- データカラム10個すべて一致(NULL値も考慮)
  • DELETE ... USING構文で2つのテーブルを結合して条件に合うレコードを削除
  • IS NOT DISTINCT FROMにより、NULLNULLも「一致」として扱う
  • 主キー4つ + データカラム10個 = 計14項目すべてが一致する場合のみ削除
2. UPSERT処理のSQL
INSERT INTO t_datalatest (...)
SELECT ... FROM t_data a
ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4) 
DO UPDATE SET column_1 = EXCLUDED.column_1, ...
WHERE (
    t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
    t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
    -- データ差分がある場合のみ更新実行
);
  • INSERTで新規データの挿入を試行
  • 主キー重複時はON CONFLICTで更新処理に移行
  • EXCLUDEDは挿入しようとしたデータを参照
  • WHERE句で各カラムを比較し、実際に差分がある場合のみ更新を実行

差分検出ロジック

処理条件動作
削除主キー+データ用のカラムが完全一致トランザクション(t_data)から該当レコードを削除
更新主キー一致 + データ差分あり最新テーブル(t_datalatest)を更新
挿入主キーが最新テーブルに存在しない最新テーブル(t_datalatest)に新規挿入

テストと実行結果

サンプルデータの準備

-- サンプルデータの挿入(テスト用)
INSERT INTO t_data (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES 
    ('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
    ('A002', 'B002', 'C002', 'D002', 'データ2-1', 'データ2-2', 'データ2-3', 'データ2-4', 'データ2-5', 'データ2-6', 'データ2-7', 'データ2-8', 'データ2-9', 'データ2-10'),
    ('A003', 'B003', 'C003', 'D003', 'データ3-1', 'データ3-2', 'データ3-3', 'データ3-4', 'データ3-5', 'データ3-6', 'データ3-7', 'データ3-8', 'データ3-9', 'データ3-10')
ON CONFLICT DO NOTHING;

INSERT INTO t_datalatest (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES 
    ('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
    ('A002', 'B002', 'C002', 'D002', '古いデータ2-1', '古いデータ2-2', '古いデータ2-3', '古いデータ2-4', '古いデータ2-5', '古いデータ2-6', '古いデータ2-7', '古いデータ2-8', '古いデータ2-9', '古いデータ2-10')
ON CONFLICT DO NOTHING;

実行例

実行前のテーブル状態:

-- 実行前確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;
labelcount
トランザクションテーブル件数3
最新データテーブル件数2

プロシージャ実行:

CALL sync_t_data_to_t_datalatest_upsert_correct();

実行ログ:

NOTICE:  差分チェック処理を開始します...
NOTICE:  処理件数: 3件
NOTICE:  UPSERT総処理件数: 2件
NOTICE:  Aから削除(差分なし): 1件
CALL

実行後のテーブル状態:

-- 実行後確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;
labelcount
トランザクションテーブル件数2
最新データテーブル件数3

処理結果の詳細

  1. A001 - 完全一致のため、t_dataから削除
  2. A002 - データ差分ありのため、t_datalatestを更新
  3. A003 - 新規データのため、t_datalatestに挿入

参考

おわりに

今回は、PL/pgSQLでテーブル間の差分検出・同期処理を実装してみた。 C#の場合はエンティティなどを作成する必要があるが、プロシージャの場合はSQLを組み合わせれば実装できるため比較的楽に感じる。 ただし、プロシージャのデバッグが大変なので、デバッグや単体テストについて良い方法がないかを今後調べてみる予定である。

Hugo で構築されています。
テーマ StackJimmy によって設計されています。