2つのテーブルのレコードを差分チェックして最新データを保持するバッチを作る

はじめに

業務でよくあるのが、大量のトランザクションデータが日々流れ込んでくるシステムから最新データだけを取り出したい。
そこで今回は、PostgreSQLC# .NET 8を使って、2つのテーブル間で差分をチェックして最新データをもう一つのテーブルに登録/更新するバッチ処理を作ってみた。

GitHub

今回作成したプログラムのソースコードは以下のリポジトリで公開している。

https://github.com/katsuobushiFPGA/CheckDiffTable

このプログラムを実行するとどうなるか

このバッチ処理を実行すると、以下のような動作をする。

  1. トランザクションテーブルから全データを取得
  2. 最新データテーブルの現在の内容と比較
  3. 差分がある場合は最新データテーブルを更新(新規登録 or 更新)
  4. 差分がない場合はトランザクションデータを削除(クリーンアップ)
  5. 処理結果をログ出力

例えば、以下のような流れになる。

# 初回実行時
新規登録: 5件 → エンティティID 1001-1005が最新データテーブルに追加される

# 2回目実行(差分なし)
スキップ: 5件, 削除: 5件 → 同じデータのためトランザクションテーブルから削除

# エンティティ1001のデータが更新された場合
更新: 1件 → `latest_data_table``entity_id=1001`のレコードを更新
削除: 4件 → 差分のなかった残り4件を`transaction_table`から削除

プログラムの実行方法

前提条件

PostgreSQLデータベースが準備されていること。

-- トランザクションテーブル(複合主キー設計)
CREATE TABLE transaction_table (
    id SERIAL PRIMARY KEY,
    entity_id INTEGER NOT NULL,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    status VARCHAR(20) NOT NULL,
    amount DECIMAL(10,2),
    transaction_type VARCHAR(20) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 最新データテーブル(複合主キー設計)
CREATE TABLE latest_data_table (
    id INTEGER NOT NULL,
    entity_id INTEGER NOT NULL,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    status VARCHAR(20) NOT NULL,
    amount DECIMAL(10,2),
    transaction_type VARCHAR(20) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, entity_id)  -- 複合主キー
);

-- インデックス作成
CREATE INDEX idx_transaction_entity_id_created_at ON transaction_table(entity_id, created_at DESC);
CREATE INDEX idx_latest_data_composite_key ON latest_data_table(id, entity_id);
CREATE INDEX idx_latest_data_entity_id ON latest_data_table(entity_id);

実行手順

  1. 設定ファイル編集
{
  "ConnectionStrings": {
    "DefaultConnection": "Host=localhost;Database=your_db;Username=user;Password=pass;Timeout=30;Command Timeout=60;Timezone=Asia/Tokyo"
  },
  "BatchProcessing": {
    "BatchSize": 1000
  }
}
  1. バッチ実行
# プロジェクトディレクトリで実行
dotnet run

# または発行済みバイナリで実行
./CheckDiffTable

プログラムの概要

使用技術とアーキテクチャ

このプログラムは以下の技術を使って構築している。

メイン技術スタック

技術説明
C# .NET 8.0高性能なクロスプラットフォーム開発環境
PostgreSQL高性能データベース(複合主キー対応)
NpgsqlPostgreSQL用データアクセスライブラリ(NpgsqlDataSource使用)
Microsoft.ExtensionsDI・Configuration・Logging統合機能

アーキテクチャパターン

パターン説明
Repository Patternデータアクセスの統一
Dependency Injection疎結合設計
Batch Processing Pattern効率的な大量データ処理
Options Pattern設定管理

システム構成・シーケンス図

以下の図で処理の流れとデータフローを示す。

DFD・ER図

テーブル間の関係とエンティティ構造は以下の通り。

設計のポイント

  • transaction_tableは複合主キー(id, entity_id
  • latest_data_tableは複合主キー(id, entity_id
  • 1つのエンティティに対して複数のトランザクションが存在可能
  • バッチ処理で最新データのみを抽出・更新

テストと実行結果

各テストケースごとに、実際にSQLを実行してからバッチプログラムを動かしてみる。

テストケース1: 初回実行時(新規データのみ)

まず、テストケース1のSQLを実行してトランザクションデータを用意する。

-- ===== テストケース1: 新規データのみ =====
INSERT INTO transaction_table (id, entity_id, name, description, status, amount, transaction_type, created_at, updated_at) VALUES
(1, 1, '商品A', '商品Aの説明', 'active', 1000.00, 'sale', '2025-01-01 10:00:00', '2025-01-01 10:00:00'),
(2, 2, '商品B', '商品Bの説明', 'active', 2000.00, 'sale', '2025-01-01 12:00:00', '2025-01-01 12:00:00'),
(3, 3, '商品C', '商品Cの説明', 'inactive', 500.00, 'purchase', '2025-01-01 13:00:00', '2025-01-01 13:00:00'),
(4, 4, '商品D', '商品Dの説明', 'active', 3000.00, 'sale', '2025-01-02 16:00:00', '2025-01-02 16:00:00');

この状態でバッチプログラムを実行すると、全てのトランザクションデータが最新データテーブルに登録される。

実行前のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A商品Aの説明active1000.00sale
22商品B商品Bの説明active2000.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
(空のテーブル)

実行後のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A商品Aの説明active1000.00sale
22商品B商品Bの説明active2000.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A商品Aの説明active1000.00sale
22商品B商品Bの説明active2000.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale
=== トランザクション差分チェック・更新システム開始 ===
一括処理モード - デフォルトバッチサイズ
バッチ処理開始: バッチサイズ 1000
トランザクション取得完了: 4件
トランザクション対象エンティティ数: 4件
トランザクション内一括UPSERT実行完了: 4エンティティ, 4行処理
一括UPSERT完了: 4レコード(新規:4件, 更新:0件)
全バッチ処理完了(差分なしトランザクションの削除を含む)
一括処理完了: 4エンティティ処理 (新規:4, 更新:0, スキップ:0, エラー:0) 削除:0件 処理時間:168ms
=== 一括処理結果サマリー ===
処理成功: True
総エンティティ数: 4
新規登録: 4件
更新: 0件
スキップ: 0件
削除: 0件
エラー: 0件
処理時間: 168.4099ms
メッセージ: 一括処理完了: 4エンティティ処理 (新規:4, 更新:0, スキップ:0, エラー:0) 削除:0件 処理時間:168ms
=== 詳細結果 ===
EntityID: 1, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 1
EntityID: 2, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 2
EntityID: 3, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 3
EntityID: 4, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 4
=== トランザクション差分チェック・更新システム終了 ===

テストケース2: 差分ありデータの更新テスト

次に、テストケース2のSQLを実行してデータを更新する。

-- ===== テストケース2: 差分ありデータの更新 =====
TRUNCATE TABLE transaction_table;

INSERT INTO transaction_table (id, entity_id, name, description, status, amount, transaction_type, created_at, updated_at) VALUES
(1, 1, '商品A更新版', '商品Aの更新された説明', 'active', 1500.00, 'sale', '2025-01-02 11:00:00', '2025-01-02 11:00:00'),
(2, 2, '商品B更新版', '商品Bの更新説明', 'inactive', 2500.00, 'sale', '2025-01-02 13:00:00', '2025-01-02 13:00:00'),
(5, 5, '商品E', '新しい商品Eの説明', 'active', 4000.00, 'sale', '2025-01-02 14:00:00', '2025-01-02 14:00:00');

この状態でバッチプログラムを実行すると、以下のような結果になる。

実行前のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B更新版商品Bの更新説明inactive2500.00sale
55商品E新しい商品Eの説明active4000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A商品Aの説明active1000.00sale
22商品B商品Bの説明active2000.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale

実行後のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B更新版商品Bの更新説明inactive2500.00sale
55商品E新しい商品Eの説明active4000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B更新版商品Bの更新説明inactive2500.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale
55商品E新しい商品Eの説明active4000.00sale
=== トランザクション差分チェック・更新システム開始 ===
一括処理モード - デフォルトバッチサイズ
バッチ処理開始: バッチサイズ 1000
トランザクション取得完了: 3件
トランザクション対象エンティティ数: 3件
トランザクション内一括UPSERT実行完了: 3エンティティ, 3行処理
一括UPSERT完了: 3レコード(新規:1件, 更新:2件)
全バッチ処理完了(差分なしトランザクションの削除を含む)
一括処理完了: 3エンティティ処理 (新規:1, 更新:2, スキップ:0, エラー:0) 削除:0件 処理時間:186ms
=== 一括処理結果サマリー ===
処理成功: True
総エンティティ数: 3
新規登録: 1件
更新: 2件
スキップ: 0件
削除: 0件
エラー: 0件
処理時間: 186.3512ms
メッセージ: 一括処理完了: 3エンティティ処理 (新規:1, 更新:2, スキップ:0, エラー:0) 削除:0件 処理時間:186ms
=== 詳細結果 ===
EntityID: 1, Action: Update, Success: True, Message: データ更新, TransactionID: 1
EntityID: 2, Action: Update, Success: True, Message: データ更新, TransactionID: 2
EntityID: 5, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 5
=== トランザクション差分チェック・更新システム終了 ===

テストケース3: 差分なし・一部更新の混在テスト

続いて、テストケース3のSQLを実行する。

-- ===== テストケース3: 差分なしデータ(スキップ&削除) =====
TRUNCATE TABLE transaction_table;

INSERT INTO transaction_table (id, entity_id, name, description, status, amount, transaction_type, created_at, updated_at) VALUES
(1, 1, '商品A更新版', '商品Aの更新された説明', 'active', 1500.00, 'sale', '2025-01-03 10:00:00', '2025-01-03 10:00:00'),  -- 差分なし
(2, 2, '商品B最終版', '商品Bの最終説明', 'active', 2800.00, 'return', '2025-01-03 11:00:00', '2025-01-03 11:00:00'),  -- 差分あり
(6, 6, '商品F', '新商品Fの説明', 'active', 5000.00, 'sale', '2025-01-03 12:00:00', '2025-01-03 12:00:00');             -- 新規

この状態でバッチプログラムを実行すると、以下の結果になる。

実行前のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B最終版商品Bの最終説明active2800.00return
66商品F新商品Fの説明active5000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B更新版商品Bの更新説明inactive2500.00sale
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale
55商品E新しい商品Eの説明active4000.00sale

実行後のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
22商品B最終版商品Bの最終説明active2800.00return
66商品F新商品Fの説明active5000.00sale

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B最終版商品Bの最終説明active2800.00return
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale
55商品E新しい商品Eの説明active4000.00sale
66商品F新商品Fの説明active5000.00sale
=== トランザクション差分チェック・更新システム開始 ===
一括処理モード - デフォルトバッチサイズ
バッチ処理開始: バッチサイズ 1000
トランザクション取得完了: 3件
トランザクション対象エンティティ数: 3件
トランザクション内一括UPSERT実行完了: 2エンティティ, 2行処理
一括UPSERT完了: 2レコード(新規:1件, 更新:1件)
削除完了: 1件
差分なしトランザクション削除完了: 1件
全バッチ処理完了(差分なしトランザクションの削除を含む)
一括処理完了: 3エンティティ処理 (新規:1, 更新:1, スキップ:1, エラー:0) 削除:1件 処理時間:150ms
=== 一括処理結果サマリー ===
処理成功: True
総エンティティ数: 3
新規登録: 1件
更新: 1件
スキップ: 1件
削除: 1件
エラー: 0件
処理時間: 150.4469ms
メッセージ: 一括処理完了: 3エンティティ処理 (新規:1, 更新:1, スキップ:1, エラー:0) 削除:1件 処理時間:150ms
=== 詳細結果 ===
EntityID: 1, Action: None, Success: True, Message: 差分なし - スキップ, TransactionID: 1
EntityID: 2, Action: Update, Success: True, Message: データ更新, TransactionID: 2
EntityID: 6, Action: Insert, Success: True, Message: 新規データ登録, TransactionID: 6
=== トランザクション差分チェック・更新システム終了 ===

テストケース4: 大量データでのパフォーマンステスト

最後に、テストケース4のSQLを実行して大量データをテストする。

-- ===== テストケース4: 大量データテスト =====
TRUNCATE TABLE transaction_table;

INSERT INTO transaction_table (id, entity_id, name, description, status, amount, transaction_type, created_at, updated_at) 
SELECT 
    s as id,
    s as entity_id,
    'テスト商品' || s as name,
    'テスト説明' || s as description,
    CASE WHEN s % 2 = 0 THEN 'active' ELSE 'inactive' END as status,
    (s * 100.0) as amount,
    CASE WHEN s % 3 = 0 THEN 'sale' ELSE 'purchase' END as transaction_type,
    CURRENT_TIMESTAMP as created_at,
    CURRENT_TIMESTAMP as updated_at
FROM generate_series(1, 1000) as s;

この状態でバッチプログラムを実行すると、以下のような結果になる。

実行前のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11テスト商品1テスト説明1inactive100.00purchase
22テスト商品2テスト説明2active200.00purchase
33テスト商品3テスト説明3inactive300.00sale
10001000テスト商品1000テスト説明1000active100000.00purchase

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11商品A更新版商品Aの更新された説明active1500.00sale
22商品B最終版商品Bの最終説明active2800.00return
33商品C商品Cの説明inactive500.00purchase
44商品D商品Dの説明active3000.00sale
55商品E新しい商品Eの説明active4000.00sale
66商品F新商品Fの説明active5000.00sale

実行後のテーブル状態:

transaction_table

identity_idnamedescriptionstatusamounttransaction_type
11テスト商品1テスト説明1inactive100.00purchase
22テスト商品2テスト説明2active200.00purchase
33テスト商品3テスト説明3inactive300.00sale
10001000テスト商品1000テスト説明1000active100000.00purchase

latest_data_table

identity_idnamedescriptionstatusamounttransaction_type
11テスト商品1テスト説明1inactive100.00purchase
22テスト商品2テスト説明2active200.00purchase
33テスト商品3テスト説明3inactive300.00sale
10001000テスト商品1000テスト説明1000active100000.00purchase

パフォーマンステスト結果

大量データでのテスト結果を以下に示す。

=== トランザクション差分チェック・更新システム開始 ===
一括処理モード - デフォルトバッチサイズ
バッチ処理開始: バッチサイズ 1000
トランザクション取得完了: 1000件
トランザクション対象エンティティ数: 1000件
トランザクション内一括UPSERT実行完了: 1000エンティティ, 1000行処理
一括UPSERT完了: 1000レコード(新規:994件, 更新:6件)
全バッチ処理完了(差分なしトランザクションの削除を含む)
一括処理完了: 1000エンティティ処理 (新規:994, 更新:6, スキップ:0, エラー:0) 削除:0件 処理時間:327ms
=== 一括処理結果サマリー ===
処理成功: True
総エンティティ数: 1000
新規登録: 994件
更新: 6件
スキップ: 0件
削除: 0件
エラー: 0件
処理時間: 327.16ms
メッセージ: 一括処理完了: 1000エンティティ処理 (新規:994, 更新:6, スキップ:0, エラー:0) 削除:0件 処理時間:327ms
=== 詳細結果 ===
EntityID: 1, Action: Update, Success: True, Message: データ更新, TransactionID: 1
...

データ確認用のクエリ

各テストケースの実行後、以下のクエリでデータを確認できる。

-- トランザクションテーブルの確認
SELECT * FROM transaction_table ORDER BY entity_id, created_at;

-- 最新データテーブルの確認
SELECT * FROM latest_data_table ORDER BY entity_id;

参考

おわりに

今回は、C#でデータベースの差分チェックバッチを実装してみた。

このバッチだが、GitHub Copilotを使ってvibe codingで作っており、レビューを何回もやって実行できる形になった。
自分の手はほとんど動かすことはなかったが、プログラムにおかしいところないかを目で見てチェックするフェーズしかなかったので目が疲れた…。

後は、C#についてあまり詳しくないのでAIが実装した形でいいのかという部分が少しあった。
→DB接続の部分とか、ディレクトリの切り分け方とか、アーキテクチャの実装とか…。

とはいえ、参考になる部分もあり、Linqの使い方とかはなるほどな~となったのでこれは取り入れていきたい。
vibe codingも勉強しつつ、C#も上達できるとよいなという気持ちで作ってみた。

Hugo で構築されています。
テーマ StackJimmy によって設計されています。