有名テック企業の技術ブログを、ひとつのフィードで。
フィード
112件
こんにちは。メルペイでソフトウェアエンジニアをしている @sapuri です。この記事は Merpay & Mercoin Tech Openness Month 2026 の 9日目の記事です。 はじめに 本記事は、2026年4月27日の Background Job Talk 〜 Temporal 活用と独自実装の舞台裏編〜 で発表した「内製ワークフローエンジンの設計とメルカリでの活用事例」を記事化したものです。 マイクロサービスアーキテクチャのような分散システムでは、複数のサービスにまたがる処理のデータ整合性をどう保つか、いわゆる分散トランザクションの扱いが大きな課題となります。 メルカリでは、この課題を Saga パターンによる結果整合性で解決するために、自社でワークフローエンジンを開発して運用しています。 このワークフローエンジンは、もともとメルコインの決済基盤における分散トランザクション管理のために開発したものです。メルペイの Payment Service で得た知見も取り入れながら設計し、現在はメルカリグループ内の複数のユースケースで利用が広がっています。 この記事では、内製に至った背景とワークフローエンジンの具体的な設計、社内での活用事例について紹介します。 分散トランザクション管理の課題と Saga パターン メルカリでは主にマイクロサービスアーキテクチャを採用しています。 そのため、お客さまがアプリで1つの操作をすると、そのリクエストは基本的に複数のサービスをまたいで処理されます。 例えば、メルカリアプリからビットコインを購入するときの決済リクエストでは、取引データの作成、メルコインの日本円残高の減算、メルペイのポイントの減算、ビットコイン残高の加算、取引データの更新といった複数の処理が関わります。 この決済リクエストは、1つのトランザクションとして扱う必要があります。つまり、一連の処理をすべて成功させるか、すべて失敗させるかのどちらかに寄せる必要があります。 しかし、各サービスがそれぞれデータベースを持っているため、単純にロールバックすることはできません。この点を考慮せずに実装すると、エラーのタイミングによってデータの不整合が発生します。 例えば、このような不整合が起こりえます。 決済が失敗したのにメルコインの日本円残高が減っている 残高は減ったがビットコイン残高が加算されない ビットコインと交換できているのに取引が完了扱いになっていない また、Two-Phase Commit のような分散トランザクションでは長期間リソースをロックするため、サービスの可用性が下がる可能性があります。 そのため、メルカリでは結果整合性のアプローチで、このような分散トランザクションを解決しています。 Saga この結果整合性を実現するためのアーキテクチャの1つとして、Saga というパターンがあります。 Saga は、トランザクションを複数の小さなトランザクションに分割して順次実行することで長時間のロックを不要にします。途中でリトライ不可能なエラーが出た場合は、成功済みの処理に対する補償トランザクションを逆順で実行します。 先ほどの暗号資産購入の例で、途中のビットコイン残高を増やす処理でリトライできないエラーが発生した場合を考えます。 この場合、この時点までに成功した処理を取り消す補償トランザクションを逆順に実行します。すでにメルコインの残高とメルペイのポイントが減らされているので、まずポイントを戻し、その次にメルコイン残高を戻し、最後に取引データを失敗として更新します。 このように実装することで、途中のどこで失敗しても結果整合性を保って処理を完了させることができます。 このあたりの話は以前の記事でも紹介しているので、興味のある方はそちらもぜひご覧ください。 メルコイン決済基盤における分散トランザクション管理 | メルカリエンジニアリング ワークフローエンジンの検討 実装の方針が決まったので、実際に Saga パターンを実装するためにワークフローエンジンの導入を検討しました。 主に検討したツールは、GCP Workflows、Cadence、Temporal です。メルカリでは主に GCP を使ってサービスを構築しているため、まず GCP Workflows を検討しました。ただ、各処理を HTTP のエンドポイントとして実装する必要があり、ユニットテストがやりにくいという懸念がありました。また、YAML ではなく Go のコードでワークフローを記述したいという要望もありました。 Cadence と Temporal も検討しましたが、メルカリでは Cloud Spanner をメインに使っているため、Spanner に対応していなかったことから採用できませんでした。また、Temporal はシステムの規模が大きく、仕組みも比較的複雑なため、運用面にも不安がありました。 このように、既存ツールでは要件を満たせなかったため、自社でワークフローエンジンを開発することにしました。 開発時には、Cadence / Temporal のインターフェースの良さを取り入れつつ、メルペイの Payment Service ですでに実績があった「DB への実行状態の永続化 x インメモリキュー x Worker での実行管理」のアーキテクチャを再利用する方針にしました。また、Go 専用で必要な機能のみに絞ることで、数人の兼務メンテナーでも運用できる規模にしています。 ワークフローエンジンの設計 アーキテクチャ このワークフローエンジンは、アプリケーションサーバーと同じ Pod でデプロイされることを想定しています。 Go runtime で動作し、利用者は SDK として扱います。 アプリケーションは Manager というインターフェースを使ってワークフローエンジンを操作します。主に Register と Execute という2種類のインターフェースを使います。 アプリケーションはワークフローを普通の Go の関数として実装するので、その関数の内容を事前にワークフローエンジンに登録する必要があります。 manager.RegisterWorkflow() が呼び出されると、Manager は Registry というインメモリの領域に関数を格納します。 manager.Workflow().Execute() は、実際にワークフローを実行するインターフェースです。 呼び出されると、Manager は Engine Server という gRPC サーバーに対して Workflow や Activity を作成するリクエストを送ります。 Engine Server は関数名や引数、実行状態を DB に保存し、インメモリキューである Channel に WorkflowStarted イベントを publish します。 その後、Worker という goroutine が WorkflowStarted イベントを subscribe し、Registry から実行する関数を取得して、Go のリフレクションを使って実行します。 実行が完了すると Worker は Engine Server に完了を報告し、Engine Server は結果を保存して WorkflowCompleted イベントを publish します。 その後、Worker が WorkflowCompleted イベントを subscribe し、アプリケーションに関数の実行結果を返却します。 もしその結果がエラーだった場合は、後述する ErrorMarshaler というインターフェースで、その Workflow を完了させるかどうかを判定します。 ここまで説明したコンポーネントの役割を整理します。 Manager: SDK のエントリーポイント。アプリケーションは Workflow()、Activity()、RegisterWorkflows() などを呼び出します。 Engine Server: Create、Complete、List などの gRPC API を提供するサーバー。DB に Workflow や Activity の I/O と状態を保存します。 Channel: Workflow や Activity の状態遷移イベントのハブとなるインメモリキュー。 Workers: Workflow や Activity を実行する goroutine 群。Channel から状態遷移イベントを購読し、イベントの種別に応じた処理を実行します。 Registry: Register された関数をインメモリで保持します。 Recovery Worker: Engine Server に対して定期的に未完了の Workflow と Activity を List してリトライします。 コードサンプル アプリケーション側の実装イメージは次のようになります。 func (s *Service) createExchangeWorkflow(ctx context.Context, params *CreateExchangeParams) (*CreateExchangeResult, error) { saga := workflow.NewSaga(s.wm) if err := s.wm.Activity(s.authorizeBalance, params.Balance).ExecuteWait(ctx); err != nil { return nil, err } saga.AddCompensation(s.cancelBalance, params.Balance) if err := s.wm.Activity(s.authorizePoint, params.Point).ExecuteWait(ctx); err != nil { if !isCompletableError(err) { return nil, err } if cerr := saga.Execute(ctx, func(e execution.Execution) error { return e.Wait(ctx) }); cerr != nil { return nil, fmt.Errorf("failed to execute compensation activities: %w, orig_err: %v", cerr, err) } return nil, err } return &CreateExchangeResult{}, nil } まず、createExchangeWorkflow という関数が定義されています。 この関数は、残高を確保する authorizeBalance という Activity と、ポイントを確保する authorizePoint という Activity を順に実行して結果を返す処理です。 特徴的なのは、それぞれの Activity を実行した直後に、この SDK が提供する Saga の AddCompensation インターフェースで補償トランザクションを登録している点です。 これにより、authorizeBalance Activity が成功した後に authorizePoint Activity が失敗した場合は、authorizeBalance を取り消す処理である cancelBalance という関数が補償トランザクションとして実行されます。 エラーハンドリング このワークフローエンジンでは、3種類のエラーを定義しています。 Completable Error: Workflow や Activity を失敗として完了させてよい、想定されたエラーです。例として、残高不足や利用制限があります。 Retryable Error: リトライ対象のエラーです。 Incompletable Error: Workflow を完了させずに停止し、Recovery Worker が後でリトライするエラーです。 Completable Error は、明示的に完了できるエラーだけを完了扱いにするための仕組みです。 クライアント側で ErrorMarshaler というインターフェースを実装したエラーとして定義される 該当しないエラーはすべて未完了として実行を停止し、Recovery Worker によってリトライされる 明示的に Completable Error を返さない限り Workflow は完了しない アプリケーションが意図していない異常な状態で Workflow が完了しない設計になる type ErrorMarshaler interface { MarshalCompletableError(error) ([]byte, error) UnmarshalCompletableError(marshaledErr []byte) error } 具体例として、ドメインのカスタムエラー型に Completable() というメソッドを定義し、それを使って ErrorMarshaler を実装します。 このようなカスタムエラー型を作っておくことで、ビジネスロジックで特定のエラーコードを含むエラーを返すと、ワークフローエンジンで完了可能なエラーとして処理されます。 type Error struct { code ErrorCode msg string } func (e *Error) Error() string { return e.msg } func (e *Error) Completable() bool { return e.code == ErrCodeCompletable } type workflowError struct { Code ErrorCode Msg string } type workflowErrorMarshaler struct{} func (workflowErrorMarshaler) MarshalCompletableError(err error) ([]byte, error) { var aerr *Error if !errors.As(err, &aerr) { return nil, err } if !aerr.Completable() { return nil, err } return json.Marshal(&workflowError{ Code: aerr.code, Msg: aerr.msg, }) } func (workflowErrorMarshaler) UnmarshalCompletableError(data []byte) error { var werr workflowError if err := json.Unmarshal(data, &werr); err != nil { return err } return &Error{ code: werr.Code,
こんにちは。メルコインのフロントエンド(FE)エンジニアとしてインターンをしている@nanacomです。この記事は「Merpay & Mercoin Tech Openness Month 2026」の7日目の記事です。 はじめに インターンではFEに限らず、要件定義からバックエンド(BE)開発まで、1つのプロジェクトに幅広く取り組みました。その中で、メルコインの社内ツールを開発する際に、2つのAPIの結果を日時降順にマージして返すエンドポイントを実装するケースに直面しました。 結果を結合して並べ替えるだけならシンプルですが、マージした一覧にもページネーションを提供しようとすると、各ソースのカーソルをどこまで進めるべきかが複雑になります。本記事では、「マージ結果として採用された件数」と「各データソース側で進めるべきカーソル」のズレにどう対処したかを紹介します。具体的には、データ取得とカーソル確定を分離する「2フェーズ取得パターン」と、各ソースのカーソルを1つのトークンに束ねる「複合ページネーショントークン」の2つの設計を取り上げます。 前提:対象とするユースケース マイクロサービスアーキテクチャでは、BFF(Backend For Frontend)で複数のサービスからデータを集約して一覧表示することがよくあります。今回対象としたのは、2つの独立したデータソース(A, B)のデータをマージするケースです。いずれも日時降順にソートされたデータを返し、それぞれがカーソルベースのページネーションAPIを提供しています。カーソルベースのページネーションとは、前回の取得結果の末尾を示すトークン(カーソル)を次のリクエストに渡すことで、続きのデータを取得する方式です。 この2つのソースの結果を日時降順にマージした一覧をクライアントに返しつつ、その一覧自体にもページネーションを提供する必要がありました。つまり、各ソースが独立して管理するカーソルを、BFF側でどう扱うかが設計上の焦点でした。 売買と入出金をマージした一覧表示(※表示データはすべてダミーです) 素朴なアプローチとその限界 この設計上の焦点に対して、私たちはまず2つの素朴なアプローチを検討しました。いずれも限界があり、最終的な設計への動機となりました。 アプローチ1:全件取得してソート 最も単純な方法は、両ソースから全件を取得し、アプリケーション側でソートしてからページごとに切り出す方法です。しかし、データ数が増えるとメモリ使用量とレイテンシーが線形に増加するため、スケールしません。 アプローチ2:各ソースからpageSize件取得してマージ 各ソースからそれぞれ pageSize 件を取得し、マージして上位 pageSize 件を選択する方法です。データ取得量を抑えられるため現実的ですが、ここで1つの問題が発生します。 例として pageSize=5 のとき、Aから [A1..A5]、Bから [B1..B5] が返ってきたとします(いずれも日時降順)。これらをマージして上位5件を作ると、マージ結果に含まれるのがAから3件(A1,A2,A3)、Bから2件(B1,B2)になるとします。 次のページでは本来、AはA4から、BはB3から取得を再開する必要があります。しかし各ソースAPIが返すカーソルは「返却リスト末尾の次」を指すため、手元のカーソルはA6(= Aを5件進めた次)やB6(= Bを5件進めた次)を指してしまいます。マージ結果に必要な再開位置(A4/B3)と、手元のカーソル(A6/B6)が一致しません。 (図1)各ソースから取得 (pageSize=5) Source A: [A1][A2][A3][A4][A5] -> cursorA = A6 Source B: [B1][B2][B3][B4][B5] -> cursorB = B6 マージして上位5件を採用すると、実際に消費したのは Aが3件 / Bが2件 になります(採用: A1 A2 A3 / B1 B2)。 このとき次ページで「本当に再開したい位置」と「手元のカーソル」がズレます。 ソース 次ページで本当は 手元のカーソル A A4 から再開 A6 を指す B B3 から再開 B6 を指す これが、本記事で解決する核心的な課題です。次のセクションでは、この課題に対して理想的にはどう解決すべきかを考え、そのうえで私たちが採った設計方針を説明します。 理想の解決策と現実の制約 カーソルベースAPIでは、返却件数とカーソルの進行量が常に一致します。pageSize=5 でリクエストすれば5件返り、カーソルも5件分進みます。しかし今回のように複数ソースのデータをマージするケースでは、5件取得しても実際に採用するのは一部だけです。この「取得件数」と「消費件数」のズレが根本原因です。 仮に各ソースのAPIがカーソルではなくタイムスタンプによる範囲指定をサポートしており、かつソース内のタイムスタンプが一意であれば、この問題は発生しません。例えば、以下のように、マージ結果で最後に消費したアイテムの日時を基準に次ページを取得できます。 GET /orders?before=2025-01-01T10:00:00Z&limit=5 GET /transfers?before=2025-01-01T10:00:00Z&limit=5 この方式であれば、各ソースの消費済み最終タイムスタンプを1つのトークンに含めるだけで、BFF側に状態を持たずに1回のリクエストでページネーションを実現できます。また before で過去方向に切るため、新しいデータが追加されてもページ跨ぎの重複が起きません。 しかし、各マイクロサービスのAPI仕様を変更するのは現実的ではないため、既存仕様のままBFF層で解決する方法を検討しました。 設計方針の決定 BFF層での解決策として、トークンへの情報埋め込み、サーバー側キャッシュ、データ取得とカーソル確定の分離という3つの方法を検討しました。設計のシンプルさとステートレス性を重視した結果、3つ目の「2フェーズ取得」方式を採用しました。 方法1:トークンに情報を詰め込む(拡張複合トークン) 各ソースのカーソルを1つのトークンに束ねて返す際に、カーソルだけでなく、次ページを再開するために必要な情報をまるごとトークン内に埋め込む設計です。例えば「Aから何件/Bから何件消費したか」のようなメタ情報も含め、JSONにまとめてBase64エンコードして返します。 { "cursorA": "abc123", "cursorB": "def456", "consumedA": 3, "consumedB": 2 } この方式だと、クライアントが次のリクエストでトークンをそのまま返すことで、サーバーはトークンをデコードするだけで「次ページの再開位置(A4/B3など)」を復元できます。 しかし、既存のソースAPIがカーソルベースの仕組みを提供している中で独自にオフセット等も管理すると、「カーソルの意味」が二重になり設計が複雑化するため、採用しませんでした。 方法2:Redisなどで「使わなかったデータ」を保持する(サーバー側キャッシュ) 各ソースから pageSize 件ずつ取得してマージした結果、採用されなかった"余り"のデータ(例:A4, A5 / B3, B4, B5)をサーバー側で保持しておく設計です。例えばユーザー(またはリクエスト)単位のセッションキーでRedisに格納します。 session:user123 → { unusedA: [A4, A5], unusedB: [B3, B4, B5] } 次のページのリクエストが来たら、 まずRedisに残っているデータを先に使ってマージし 足りない分だけ各ソースAPIから追加取得する という流れにすれば、カーソルのズレ問題を回避できます。 しかし、サーバー側に状態を持つことになり、社内ツールの規模に対してインフラの運用コストが見合わないため、採用しませんでした。 方法3(採用):データ取得とカーソル確定を分離する(2フェーズ取得) 上記2つの方法では、1回のAPI呼び出しでデータ取得とカーソル確定を同時に済ませようとしています。発想を変え、データを取得してマージするフェーズと、消費件数に基づいてカーソルを確定するフェーズを分けることで、この問題を解決します。サーバーはステートレスのまま、既存APIの仕組みをそのまま活かせます。API呼び出し回数は増えますが、最もシンプルな設計です。 許容するトレードオフ ただし、この方式では2回のAPI呼び出しの間に多少の時間差が生じます。そのわずかな間に対象データが追加された場合、次ページに重複したデータが現れる可能性があります。 私たちはこの問題を、以下の理由から許容可能なトレードオフと判断しました。 影響は「ページを跨ぐ際の重複表示」に限定される 対象がリアルタイムに頻繁に更新されるデータではないため、発生頻度は低い 完全な整合性を保証するには、各ソースのAPI仕様変更が必要になり、コストに見合わない この判断のもと、以降のセクションで方法3の具体的な実装を説明します。 2フェーズ取得パターン 前のセクションで述べた方法3を、具体的にどう実装したかを説明します。データを取得してマージするフェーズと、消費件数に対応するカーソルを確定するフェーズに分けて設計しました。 フェーズ1:取得とマージ ソースA、ソースBからそれぞれ pageSize 件を並行して取得する 日時降順でマージし、合計 pageSize 件を取り出す ソースAとソースBそれぞれで、実際に消費した件数を記録する この処理は、Go の container/heap を使ったストリーミングマージとして実装できます。各ソースの先頭要素をヒープに入れ、日時が最も新しいものを1つずつ取り出しながら pageSize 件を集めます。以下のコードのとおり、各ソースのインデックス(indexA, indexB)がそのまま消費件数を表します。 func Merge(pageSize int32, itemsA, itemsB []*Item) ([]*Item, int32, int32) { indexA, indexB := 0, 0 result := []*Item{} h := &timeHeap{} heap.Init(h) if len(itemsA) > 0 { heap.Push(h, &record{source: SourceA, time: itemsA[0].Timestamp}) } if len(itemsB) > 0 { heap.Push(h, &record{source: SourceB, time: itemsB[0].Timestamp}) } for h.Len() > 0 && len(result) < int(pageSize) { r := heap.Pop(h).(*record) switch r.source { case SourceA: result = append(result, itemsA[indexA]) indexA++ if indexA < len(itemsA) { heap.Push(h, &record{source: SourceA, time: itemsA[indexA].Timestamp}) } case SourceB: result = append(result, itemsB[indexB]) indexB++ if indexB < len(itemsB) { heap.Push(h, &record{source: SourceB, time: itemsB[indexB].Timestamp}) } } } return result, int32(indexA), int32(indexB) } 戻り値の indexA と indexB が、フェーズ2でカーソルを正確に進めるための入力になります。 フェーズ2:カーソルの確定 ソースA、ソースBそれぞれにおいて、フェーズ1と同じ開始位置から消費件数分だけ再取得し、進んだ位置のページネーショントークンを取得する(cursorA, cursorB) pageToken を cursorA:cursorB(参照:次のセクション)とすることで、次ページの取得時に正しい位置からデータを取得できる なお、一方のソースのデータがもう一方より古い場合など、フェーズ1でデータが返ってきたにもかかわらずマージで1件も採用されないケースがあります。この場合は、そのソースのカーソルを前回の位置のまま保持し、次ページのリクエストで再び同じデータを取得してマージの対象にします。また、フェーズ1でデータが0件だった場合は、そのソースを枯渇と判定し、ターミナルトークン _ を設定します。 (図2)フェーズ1:取得とマージ(消費件数を記録) Source A ──(pageSize件)──┐ ├→ Merge → Top N Source B ──(pageSize件)──┘ │ 消費件数を記録 (A=3件, B=2件) (図3)フェーズ2:カーソルの確定(消費件数分だけ進める) Source A ──(消費3件)──→ cursorA Source B ──(消費2件)──→ cursorB → 複合トークン: "cursorA:cursorB" 複合ページネーショントークン設計 2フェーズ取得パターンにより、各ソースで消費件数分だけ進んだカーソルを取得できるようになりました。次に、これらのカーソルをクライアントにどのように渡すかを設計します。今回の一覧取得APIでは、pageToken を各ソースのカーソルを結合した複合トークンとして設計します。 "cursorA:cursorB" 片方のソースが完全に尽きた場合は、ターミナルトークン _ で表現します。トークンがターミナルトークン _ だった場合、API呼び出しをスキップできます。これにより、初回リクエストから片方のソースが枯渇した状態まで、以下のようにページネーショントークンで表現することができます。 トークン 意味 "" (空文字) 初回リクエスト "cursorA:cursorB" 両ソースとも継続あり "_:cursorB" ソースAは枯渇、Bのみ継続 "cursorA:_" ソースBは枯渇、Aのみ継続 "_:_" → "" に変換 全データ取得済み(次ページなし) この複合トークンと2フェーズ取得パターンを組み合わせることで、サーバー側に状態を持たずに、マージした一覧のページネーションを実現できます。 まとめ 本記事では、カーソルベースAPIを持つ複数データソースから一覧を構築する際に直面した「マージで実際に消費した件数」と「APIが返すカーソル位置」のズレという課題と、その解決策を紹介しました。 最初は1回のAPI呼び出しで全てを済ませようとしていましたが行き詰まり、「データを取得するフェーズ」と「カーソルを確定するフェーズ」に分離することで解決できました。1つの処理が複数の責務を担って複雑になったとき、フェーズを分けて各ステップの役割を単純化するアプローチは、ページネーションに限らず設計全般で有効な考え方だと感じています。 このような設計上のトレードオフを実際に手を動かしながら考えられたのは、インターン期間中の貴重な経験でした。FEに限らず幅広く関わらせていただいたことに感謝しています。本当にありがとうございました! 次の記事は@mikupoさんです。引き続きお楽しみください。
はじめに こんにちは。メルペイのAccountingチームでBackend Engineerをしている@hokaoです。 この記事は、Merpay & Mercoin Tech Openness Month 2026 の 5 日目の記事です。 会計データに誤りがあった場合、元のデータを残したまま打ち消すための記録を別途追加するのが会計上の一般的な手法です。本稿では、これをシステムとしてどう扱ったかを設計と実装の観点から紹介します。 背景 Accountingチームでは、会計データを扱うシステムを開発しています。メルカリグループ全体で発生するお金の移動を伴う取引を記録・集計するシステムで、会計イベントの保存と経理向けのレポーティングを責務としています。 会計データは、取引の事実を証明する証拠としての役割を持ちます。そのため、一度記録したデータを後から改変・削除することは原則として許されません。誤りがあったとしても元データを残したまま、打ち消すための記録を別途追加することで修正します。 しかし、私たちのシステムにはこの打ち消すための機能が存在していませんでした。誤って登録された会計データが見つかるたびに、その件数・金額などの情報を手作業で特定し、経理に連携して対応してもらう必要がありました。この運用には、対応コストの大きさや作業ミスのリスクといった構造的な課題がありました。 以降では、会計ドメインの前提を整理した上で、この課題を解決するために導入した打ち消し機能の設計と実装を順に説明します。 会計ドメインの前提 会計では、すべての取引を借方と貸方の 2 つに分けて記録する複式簿記という方式が使われています。借方と貸方それぞれに勘定科目・日付・金額を記載したものが「仕訳」で、これが会計データの最小単位になります。 仕訳に誤りがあった場合、元の仕訳の借方と貸方を入れ替えた「逆仕訳」を計上して打ち消します。 簡単な例として、ある仕入取引を次のように記録していたとします。 借方: 仕入 100 円 貸方: 現金 100 円 この記録が誤りだった場合、逆仕訳は次のようになります。 借方: 現金 100 円 貸方: 仕入 100 円 元の仕訳と逆仕訳を合算すると、勘定科目ごとに借方と貸方が打ち消し合い、金額がゼロになります。元データは残したまま、後から追加した記録によって取引を実質的に打ち消す形になります。 ここで説明したのは、逆仕訳がなぜ打ち消しとして成立するのかという会計上の考え方です。実際の会計レポートでは、必ずしも借方と貸方を足し合わせて相殺しているわけではなく、レポートによっては逆仕訳の金額を符号反転させて打ち消しを表現しています。詳しくは後述します。 逆仕訳の設計と実装 スキーマでの逆仕訳の表現 私たちの会計システムでは、上流のマイクロサービスから受け取った取引はまず会計イベントとして記録され、そこから仕訳が作成される構成になっています。それぞれ AccountingEvents と JournalEntries というテーブルで管理されています。 会計イベント (AccountingEvents) には、上流のマイクロサービスから会計の入力として受け取った取引が記録され、会計処理の種類や取引の中身を保持します。これに対して仕訳 (JournalEntries) は、会計イベントに仕訳ルールを適用して必要な属性が確定した、正式な会計記録です。1 つの会計イベントから、仕訳ルールの適用によって複数の仕訳が作成されることもあります。 逆仕訳の会計イベントには、必ず打ち消し対象となる元の会計イベントが存在します。そのため、AccountingEvents に元の会計イベントへの参照 (OriginalTransactionId) を持たせて関係性を表現します。この参照を持つイベントから作成される仕訳はすべて逆仕訳になります。 また、登録時にはこの参照を辿って、次のようなバリデーションを行います。 元の会計イベントが存在するか 元の会計イベントと整合する勘定科目か 元の会計イベントが既に他の逆仕訳によって打ち消されていないか 会計レポートには JournalEntries から集計するものと、分析用に AccountingEvents から集計するものがあります。どちらの場合でも JOIN なしで逆仕訳の判定ができるよう、JournalEntries には、その仕訳が逆仕訳かどうかを表すフラグ (IsReversal) を持たせています。このフラグは本質的には AccountingEvents の OriginalTransactionId から決まる情報ですが、JournalEntries でも独立に判定できるよう別途持たせています。 同じ会計イベントに対する逆仕訳の会計イベントが複数存在してはいけないため、上述のアプリケーション側のバリデーションに加えて、DB 側にも一意性制約を設けています。具体的には、OriginalTransactionId カラムに NULL_FILTERED オプションを指定した UNIQUE INDEX を張っています。これにより、OriginalTransactionId が NULL のイベント (通常の会計イベント) は重複扱いされず、NULL でない値だけに一意性制約がかかります。 これらをまとめると、スキーマの該当箇所は次のようになります。 CREATE TABLE AccountingEvents ( EventId STRING(100) NOT NULL, AccountingCode STRING(MAX) NOT NULL, OriginalTransactionId STRING(100), -- ... ) PRIMARY KEY (EventId); CREATE NULL_FILTERED INDEX AccountingEventsByOriginalTransactionId ON AccountingEvents(OriginalTransactionId); CREATE TABLE JournalEntries ( Id STRING(100) NOT NULL, EventId STRING(100) NOT NULL, IsReversal BOOL NOT NULL, -- ... ) PRIMARY KEY (Id); 仕訳作成での借方/貸方の入れ替え 逆仕訳の仕訳作成は、元の取引と同じ仕訳ルールを再利用しつつ、ルールから取り出した借方と貸方の属性をコード側で入れ替えて行います。 会計イベントには取引の方向を表す ItemKey という識別子が含まれ、X.to.Y の形式を取ります。仕訳ルールもこの ItemKey をキーに登録されています。逆仕訳イベントでは ItemKey が反転して Y.to.X の形で届くため、ルールを検索する際は ItemKey を一度元の向きに戻します。 ルールを取得した後、逆仕訳イベントの場合に限り、ルールから取り出した借方と貸方の各属性をコード側で入れ替えます。簡略化した擬似コードで示すと次のようになります。 originalRule := getOriginalEventRule(ev) debitAccountingTitleCode := originalRule.DebitAccountingTitleCode creditAccountingTitleCode := originalRule.CreditAccountingTitleCode debitXxx := originalRule.DebitXxx creditXxx := originalRule.CreditXxx // ... if isReversal { debitAccountingTitleCode, creditAccountingTitleCode = creditAccountingTitleCode, debitAccountingTitleCode debitXxx, creditXxx = creditXxx, debitXxx // ... } debit := JournalEntry{ AccountingTitleCode: debitAccountingTitleCode, Xxx: debitXxx, // ... IsReversal: isReversal, } credit := JournalEntry{ AccountingTitleCode: creditAccountingTitleCode, Xxx: creditXxx, // ... IsReversal: isReversal, } 仕訳ルールが借方と貸方の対称な構造を持っているため、ItemKey の反転とそれに続く借方と貸方の入れ替えという 2 つの操作だけで逆仕訳を作成でき、実装はシンプルな修正で済みました。 会計レポートでの逆仕訳の打ち消し 会計レポートに打ち消しを反映するには、集計クエリで逆仕訳または逆仕訳の会計イベントを判定し、正しく金額を計算する必要があります。 JournalEntries から集計するクエリでは IsReversal フラグで判定し、分析用に AccountingEvents から集計するクエリでは OriginalTransactionId IS NOT NULL で判定します。 会計ドメインの前提で述べたとおり、本来の逆仕訳は、勘定科目ごとに借方と貸方を足し合わせて相殺することで打ち消しを実現します。ただし、会計レポートには借方または貸方のどちらか一方だけを集計するものもあり、そうしたレポートでは足し合わせによる相殺は成立しません。そのため、集計クエリで逆仕訳または逆仕訳の会計イベントの金額を符号反転して合算するアプローチをとっています。なお、逆仕訳の金額自体は元の仕訳と同じ正の値で保存しています。 会計レポートは、対象とするテーブルや集計の意味合いがそれぞれ異なるため、共通化が難しく、もともと個別に実装されています。逆仕訳の打ち消しを反映するためには、その個別実装それぞれに修正を入れる必要がありました。例えば WHERE 句の絞り込み条件を、逆仕訳の会計イベントも拾えるように拡張するなどです。すべてのレポートに対してこのような修正を加えていったため、実装と検証には時間がかかりました。 逆仕訳バッチによる運用の自動化 加えて、誤って登録された会計データを訂正するために、逆仕訳を作成するためのバッチを新たに実装しました。このバッチは、対象となる取引の ID リストを受け取り、それぞれについて、上流のマイクロサービスの API を介して打ち消し用の取引を作成します。それが会計イベントとして本システムに登録され、逆仕訳が自動的に作成されます。 個別の取引で失敗が発生した場合はログに記録しつつ、残りの処理は継続します。また、冪等性は上流のマイクロサービス側で保証されているため、同じ ID リストで再実行することも可能です。 これにより、誤ったデータの打ち消しをシステム上で自動的に逆仕訳として反映できるようになり、これまで手作業で行っていた特定・連携の負荷と作業ミスのリスクが大きく軽減されました。 おわりに 本稿では、会計データの訂正を支える逆仕訳機能の設計と実装を紹介しました。 会計のように不変性の制約が強いドメインで開発していると、ドメインの原則が設計判断をそのまま導いてくれる場面が多く、その面白さを今回の機能開発でも改めて感じました。 今回の機能開発は、Accountingチームが抱える運用負荷削減という大きな取り組みの一環でもあります。会計システムは、会社が財務状況を正しく把握するための基盤であり、事業の成長に合わせてスケールできる状態に保ち続ける必要があります。これからも、持続的な開発ができるよう取り組んでいきたいと考えています。 次の記事は imamu さんです。引き続きお楽しみください。
はじめに こんにちは。メルカリのAI Securityエンジニアの@hi120kiです。 メルカリでは、AI AgentサービスDevinを社内の複数チームに展開しています。Devinは自律的にコードの調査・作成・PR提出までをこなせるサービスですが、組織として運用するうえでは管理上の課題がいくつかあります。 本記事ではAI SecurityチームがAI Agent Platformチームと協力し、Devin Enterprise APIを活用したカスタムTerraformプロバイダーと自動管理ツール群を自作しました。これにより、メンバーと権限の管理・シークレットローテーション・APIキーのライフサイクル管理・監査の仕組みを構築した取り組みについて紹介します。 Enterprise運用の課題 メルカリではDevinのEnterpriseプランを採用しています。Remote環境で動作するAI Agentを組織的に運用するためにOktaによるSSO、監査ログ、権限の管理、チームごとの環境分離が必須要件であり、これらを満たすために選定しました。 Devin EnterpriseではCoreプランやTeamプランのように1つのOrganizationを共有するのではなく、Enterpriseという管理基盤から複数のOrganizationを一元管理します。メルカリには複数のビジネス領域にまたがる多数のチームがあり、各チームが扱う情報を分離して保護する必要があります。そのためチームや目的に応じてOrganizationを割り当てています。 ただし、10以上のOrganizationと多数の利用者を抱える環境では、次の課題が生じます。 権限管理の課題 メンバーのOrganizationへのアサインが手動操作に依存 「誰がどのOrganizationに所属しているか」の状態管理が困難 シークレット管理の課題 各Organizationにサードパーティサービスごとの認証情報を個別に設定する必要 シークレットを手動で一斉ローテーションする手間 アクセス権の課題 Devin APIキーの有効期限管理が標準機能として提供されておらず、各Organization内に長期間未ローテーションのAPIキーが残存するリスク Devinの活用が広がるほど管理するOrganizationも増え、これらの課題の負担は拡大します。以前はWeb UIでの手作業に頼っていましたが、2025年末以降DevinがEnterprise向けAPIをv2からv3へ拡充したことで、ほとんどの管理操作をAPI経由で自動化できるようになりました。これを受け、Go言語とGitHub Actionsを用いた管理基盤を内製しています。 Devin APIの概要 Devinはv3 として最新のEnterprise管理向けAPIを提供しています。Enterprise・Organization単位のMember・Role管理や、各OrganizationのSecret・Knowledgeを操作できます。v3 APIで以下の自動管理機能を実現しました。 カスタムTerraformプロバイダー シークレットの一斉ローテーション Google Cloudサービスアカウントキーのローテーション セキュリティ管理基盤との連携 APIキー管理のみv2 APIを使用しています。v2 APIでは複数OrganizationにまたがるAPIキーの作成・取得・削除が可能で、以下を実施しています。 利用者が発行したAPIキーの定期無効化 社内AgentのDevin Wiki利用向けAPIキー管理 これらのAPI仕様はREST形式のAPIとしてDevinの公式ドキュメントにリクエストおよびレスポンスの詳細な仕様とともにドキュメント化されており、一般的なREST APIクライアントを実装することでそれぞれの機能を呼び出すことができます。今回これらのREST APIクライアントは、メルカリ社内で広く用いられているGo言語を用いてそれぞれのAPIが関数に対応するように実装し再利用しやすいように整備しました。 以下の章からそれぞれの管理機能の詳細を紹介します。 1. カスタムTerraformプロバイダー 管理基盤の中核は、Terraform Plugin Frameworkで構築したカスタムTerraformプロバイダーによるOrganizationおよびメンバー管理です。 メルカリではGoogle Cloudをはじめリソース管理にTerraformを広く利用しており、エンジニアが日常的に扱っている点から採用しました。DevinをInfrastructure as Codeで管理すると、メンバー追加や権限変更にPRレビューを挟める・Organizationやメンバーの状態をコードで把握できるようになります。公式のTerraformプロバイダーは現時点で提供されていないため自作しました。 利用者や管理者は各チーム用のOrganizationをTerraformで定義します。ACU(Agent Compute Unit)上限もここで設定し、チームごとの利用量を制御します。max_cycle_acu_limit はOrganization全体のACU上限、max_session_acu_limit は1セッションあたりの上限で、想定外のコスト超過を防ぎます。 resource "devin_organization" "mercari_example_team" { name = "mercari-example-team" max_cycle_acu_limit = 500 max_session_acu_limit = 250 } またメンバーのOrganizationへのアサインもTerraformで宣言的に管理します。 # メンバー定義(メールアドレスで参照) data "devin_member" "mercari_example_team" { for_each = toset([ "user-1@example.com", "user-2@example.com", "user-3@example.com", ]) email = each.value } # Organizationへのアサイン resource "devin_organization_member" "mercari_example_team" { for_each = data.devin_member.mercari_example_team user_id = each.value.user_id org_id = devin_organization.mercari_example_team.org_id org_role_id = "mercari_org_member" } Organizationの追加やACU上限の変更、メンバーの追加・削除は、Terraformコードの変更→PRレビュー→マージという通常の開発フローで行います。terraform plan の出力で「誰がどのOrganizationに追加/削除されるか」が明確にわかり、意図しない権限変更を防げます。 このTerraformプロバイダーではDevin Knowledgeも管理できます。KnowledgeはDevinにおけるAgent Skillのような存在です。メルカリのDevin環境では各チームが別々のOrganizationに分かれており、互いの利用状況を閲覧できません。セキュリティ面では望ましい分離ですが、活用ノウハウの共有が難しくなります。Knowledgeをプロバイダーで管理できるようにし、チーム間での活用ノウハウの配布を可能にしました。 2. シークレットの一斉ローテーション DevinはSessionごとに独立した仮想マシンを起動するため、初期状態ではGitHub等ソースコード管理サービスへの権限しか持ちません。クラウド環境やチケット管理サービスなどへ接続するには、APIキー等の認証情報を個別に設定する必要があります。 一方、DevinはAI Agentとして与えられたAPIキーを自由に扱えるうえ、Organization内のメンバーはSession内部のファイルシステムやシェルにアクセスできるため認証情報の取り扱いには注意が必要です。そこでメルカリでは、Devinに設定するAPIキー群を管理者が一元管理し、短い間隔で定期ローテーションすることで、長期間有効な認証情報がDevin上に残らないようにしています。 ただし手動でのローテーションは負担が大きく、以前は多数のOrganizationの複数Secretをローテーションするだけでかなりの時間を要していました。しかしDevinが2026年1月にSecret管理機能をv3 APIへ追加したことで、これらの操作を自動化できるようになりました。現在のローテーション手順は以下のとおりです。 Devin管理者がそれぞれのサービスで認証情報をローテーションする 新しい認証情報を事前に作成済みのGoogle Cloud Secret Managerに追加する 自動化をGitHub Actions経由で起動する ローテーションが実行され、Secret Managerから各Organizationに配布される これにより、最小限の作業で10以上のOrganizationのシークレットを一斉ローテーションできるようになりました。 3. Google Cloudサービスアカウントキーのローテーション メルカリでは主にGoogle Cloudを利用しておりライブラリの取得やテスト環境との接続にはGoogle Cloudの権限をDevinに付与する必要があります。しかしDevinは現在Workload Identity Federationに対応できるようなOIDCトークン発行機能がないため、サービスアカウントキーを用いる必要があります。 しかし前提として、メルカリではGoogle Cloud公式のベストプラクティスに従い、Organization Policyでサービスアカウントキーの発行を一律禁止しています。このためDevin専用のGoogle Cloud Projectを設け、さらにiam.serviceAccountKeyExpiryHoursを追加のOrganization Policyとして設定しました。これにより、自動化が停止した場合でもサービスアカウントキーは一定期間で無効化されます。 この仕組みのうえで、Organizationごとに個別のサービスアカウントキーを定期ローテーションしながら付与しています。 4. セキュリティ管理基盤との連携 Devin Enterprise採用の要件の一つに監査ログがあります。メルカリではAI Security およびThreat Detection and ResponseチームのAnnaがDevin v3 APIを通じて内製セキュリティ監視プラットフォームとの連携を構築しました。 この連携では、Admin権限を持つEnterprise Service UserとEnterprise Audit Logsエンドポイントを利用しています。これはv2 APIにおけるエンドポイントとは異なりページネーションがあるため、すべての監査ログを正確に取得することができます。これによりGoogle CloudのCloud Run Job を使って5分おきにAPIを取得し、前回取り込んだ最後の監査ログのタイムスタンプ以降の新規監査ログをすべて取得したうえでGoogle CloudのPubSubトピックへと転送しています。そして転送された監査ログはセキュリティ調査のためのBigQueryに保存されます。 5. 利用者が発行したAPIキーの定期無効化 Enterprise全体のAPIキーを全件取得し、作成から一定期間が経過したキーを自動で無効化します。Devinの標準機能にはないセキュリティポリシーを、APIで独自に実装しました。 これらのAPIキーは主にDevin MCPの接続に用いられます。APIキー経由で間接的にソースコードを取得できるため、厳格な管理が求められます。AI Agentを複数利用する開発環境では、使わなくなったAgentの設定ファイルに認証情報が残る・個人のAPIキーを複数人が利用する自作Agentに設定して社内公開してしまう、といった事態が起こりえます。 一定期間経過したAPIキーを自動無効化することで、利用中のAgentだけがAPIキーを保持する状態を維持し、複数人で共有するAgentには、次章で紹介するGoogle Cloud Secret Manager経由のAPIキーを利用させることで、Agentが持つ権限の可視化も実現しました。 6. 社内AgentのDevin Wiki利用向けAPIキー管理 メルカリでは各チームの開発用Organizationとは別に、Devin Wiki用のOrganizationを運用しています。Devin WikiはDevin MCP経由でリポジトリの内容を取得したり、自然言語で検索したりできます。 ソースコードの探索をAI Agentが直接行うとコンテキストを大量に消費します。ソースコード調査が必要な場面ではDevinに処理を委託することで、コンテキスト消費を抑えられます。 ただしDevin MCPの利用にはAPIキーが必要で、前章のとおり一定期間で自動無効化されます。例外となるAPIキーを設けることもできますが、目的外利用を完全には防げません。そこでAPIキーを短い間隔で定期的に再作成し、Google Cloud Secret Managerに保存する自動化を構築しました。 これにより、Devin MCPを利用するAI AgentのサービスアカウントをTerraform上で一元管理し利用状況を可視化するとともに、APIキーの定期再作成による目的外利用の防止も実現しました。 resource "google_secret_manager_secret" "shared_wiki_api_key" { secret_id = "shared-wiki-api-key" } resource "google_secret_manager_secret_iam_member" "shared_wiki_api_key" { for_each = toset(local.accessor_service_accounts_shared_wiki_api_key) secret_id = google_secret_manager_secret.shared_wiki_api_key.secret_id role = "roles/secretmanager.secretAccessor" member = "serviceAccount:${each.value}" } locals { accessor_service_accounts_shared_wiki_api_key = [ "agent-1@---.iam.gserviceaccount.com", "agent-2@---.iam.gserviceaccount.com", ] } 管理操作を動かすCIパイプライン これらの管理操作はすべてGitHub Actionsで自動化しています。SaaS管理向けに独自管理ツールを作る場合、長期的なメンテナンスが避けられません。組織変更時の引き継ぎも考慮すると、依存関係を小さく保ち、メンテナンスしやすい技術・プラットフォームを選ぶ必要があります。 Secret ManagerやサービスアカウントはGoogle Cloud上に置きつつも、処理の実行にはGitHub Actionsを選びました。リポジトリ内の自動化がデプロイなしで直接動作するためメンテナンスの手間が減り、不要なクラウドリソースを持たないことでコストと管理・引き継ぎ時の認知負荷も抑えられます。また定期実行に加え手動トリガー(workflow_dispatch)にも対応しており、緊急時のシークレットローテーションを即座に実行できます。 一方、GitHub Actionsは自由に実行できてしまうため、権限管理や<a href="https://docs.github.com/en/repositories/configuring-branches-and-merges-in-your-repository/managing-rulese
DBRE (DataBase Reliability Engineering)チームの taka-h です。 大規模なデータ更新や削除は、やりたいこと自体はSQLで表現できても、そのまま一度に実行すると運用上のリスクが高くなります。例えば大きなトランザクションが発生すると、レプリケーション遅延やDB負荷の増大、UNDOログの肥大化などにつながり、結果としてサービス影響を招く可能性があります。 そこで私たちは、UPDATE/DELETEのような「最終的にやりたい操作」をSQLに近い形で記述しつつ、実行時には安全な単位に分割して処理できる汎用ツールを実装しました。さらに、実行中に処理速度などの設定を変更できることや、監視結果に応じて自動で一時停止できることなど、実運用で必要になる制御も組み込んでいます。 本記事では、なぜこの問題が起きるのか、従来どのように回避してきたのか、そして今回のツールがどのように安全性と運用性を両立するのかを紹介します。最後に、ツールのREADMEも公開するので、同様の課題を持つ方が自分たちの環境に合わせて実装する際の叩き台として使えるはずです。 なおこのツールは、社内の次のようなデータベース運用の支援を前提とします。 データをアーカイブ/削除する データをバックフィルする データを一括で更新する 大規模データの更新/削除操作における課題 小規模なデータベースであれば、目的のSQLをそのまま実行しても問題にならないことがあります。一方で、一定以上の規模のデータを扱う場合は、同じSQLでも“そのまま一括実行する”こと自体がリスクになります。 主な理由は、処理対象が多いと大きなトランザクションが発生しやすく、その副作用がDB全体に波及するためです。具体的には、変更の伝播(レプリケーションなど)に遅延が発生したり、DBが高負荷になったり、UNDOログが肥大化して回復や性能に影響が出たりします。 このような場合の従来の方針は、「対象を小分けにして処理する」でした。たとえば、対象の主キーをある程度の件数に分割し、短いトランザクションを繰り返すようなSQLを作成してもらったり、専用の使い切りのスクリプトを都度用意して対応していました。 BEGIN; -- 対象の主キーを少量ずつ指定して処理する DELETE FROM items WHERE id IN (...); COMMIT; SLEEP ...; ただし、毎回使い切りのスクリプトを作ったり、対象主キーを取り出して分割したりするのは手間です。依頼者側に“安全な形のSQL”を組み立ててもらう必要が出るなど、運用コストが積み上がっていきます。 そこで、この問題に対して汎用的な解決策を提供するツールを実装しました。 解決策: 汎用化ツール このツールでは、利用者は「最終的に達成したい条件」をSQLに近い形で記述します。一方で実行時には、その条件に合致する対象を主キー単位で取得し、バッチに分割して短いトランザクションを繰り返すことで、安全にUPDATE/DELETEを進められるようにしています。 また、実運用では「削除や更新の進捗」とは独立に、DB全体が高負荷になったり、想定外の問題が発生したりします。そのため、状況に応じて処理速度や挙動を調整できること、そして必要なら自動的に一時停止できることが重要です。 この要件に対して本ツールでは、処理間隔やバッチサイズなどの設定を実行中に変更できる機能を持たせています。これは、MySQLのオンラインスキーマ変更ツールである gh-ost が「実行中に操作を制御できる」点で運用上便利なのと同じ発想です。さらに、監視結果に応じて自動で処理を一時停止する仕組みも組み込んでいます。 最終的なコンフィグ例は上図の通りです。実行したい条件(SQLに近い記述)と、どう安全に実行するか(運用上の関心事項)を分離して設定できます。また、processingに属する項目の多くは実行中に変更可能です。 このツールは主に生成AIを利用して実装し、動作確認のうえ社内で既に利用しています。コード自体のOSSとしての公開にはふみきれなかったのですが、次の章でこのツールのREADME.mdを公開します。これをご利用の環境に合わせた要件の追加、修正をしていただいた上で、生成AIを利用し同様のツールが利用できるようになることを期待しています。 もし試してみて有用だった点や改善アイデアがあれば、SNSなどで議論いただけると嬉しいです。また、「メルカリのDBREチームの公開したREADME.mdで作ってみた」ということで宣伝いただけるとありがたいです。 最後に、現在メルカリでは、この記事の発行者の所属する DBREチーム の EM(Engineering Manager) を募集しています。詳しくはこちらをご覧ください。 汎用データ更新ツールのREADME.md # data-updater A tool for batch data operations (UPDATE, DELETE, or NULL) on database records using primary keys with configurable conditions. ## Features - Cursor-based batch processing with configurable batch size - **Three operation types**: UPDATE, DELETE, and NULL (before_sql only) - **Parallel execution**: SELECT and UPDATE operations run concurrently for better performance - **Replica support**: Route SELECT queries to replica database to reduce primary load - **JOIN support**: Complex queries with multiple tables to identify target records - **Before SQL hooks**: Execute SQL before each batch (archiving, audit logging) - **Custom ORDER BY**: Process records in custom order - Interactive commands for runtime control (similar to gh-ost) - **YAML-based configuration**: All settings in a single configuration file - Real-time status monitoring with ETA - Pause/resume functionality - Dynamic configuration updates - Socket-based remote control interface - **Failed ID tracking**: Records failed updates and displays summary on exit - For batch-level failures: Records only first and last ID of the failed batch - For partial updates: Logs the discrepancy but doesn't track individual IDs - Writes detailed report to file if >100 failures - **Automatic resume**: Saves progress to status file after each batch - Automatically resumes from last successful position on restart - No need to manually track progress or specify resume points - Status files are adapter/table specific for multiple concurrent jobs ## Install ```bash go install github.com/xxx/cmd/data-updater ``` ## Quick Start 1. Create a configuration file: ```yaml # config.yaml database: host: localhost port: 3306 user: myuser password: mypassword database: mydatabase options: charset: utf8mb4 parseTime: "true" processing: batch_size: 1000 interval: 1s adapter: table_name: users pk_columns: - user_id update_sql: "status = 'processed', updated_at = NOW()" where_clause: "status = 'pending'" ``` 2. Run the tool: ```bash # Normal mode - executes updates data-updater --config config.yaml # Debug mode - SELECT only, no updates data-updater --config config.yaml --debug # Resume from specific ID data-updater --config config.yaml --resume-from "12345" # Show version data-updater -v ``` ## Operation Types The tool supports three operation types: ### UPDATE (default) Updates records matching the specified conditions. ```yaml adapter: table_name: users pk_columns: ["user_id"] operation: update # or omit (default) update_sql: "status = 'processed', updated_at = NOW()" where_clause: "status = 'pending'" ``` ### DELETE Deletes records matching the specified conditions. **Important**: The DELETE operation permanently removes data. Always test with --debug mode first. ```yaml adapter: table_name: old_logs pk_columns: ["id"] operation: delete where_clause: "created_at < '2023-01-01'" ``` ### NULL Executes only before_sql without UPDATE or DELETE. Useful for archiving, copying, or transforming data. ```yaml adapter: table_name: items pk_columns: ["id"] operation: "null" before_sql: | INSERT INTO archived_items (id, name, created_at, archived_at) SELECT id, name, created_at, NOW() FROM items WHERE id IN (?) where_clause: "status = 'inactive'" ``` ## Configuration All settings are managed through a YAML configuration file: ### Database Configuration ```yaml database: host: localhost # Database host (default: localhost) port: 3306 # Database port (default: 3306) user: myuser # Database user (required) password: mypassword # Database password (required) database: mydatabase # Database name (required) options: # MySQL connection options (optional) charset: utf8mb4 parseTime: "true" loc: UTC timeout: 30s # Replica configuration (optional) replica_host: replica-db.example.com # SELECT queries go here replica_port: 3306 # Defaults to primary port replica_user: replica_user # Defaults to primary user replica_password: replica_password # Defaults to primary password ``` When replica_host is configured: - SELECT queries (fetching PKs, COUNT) are routed to replica - UPDATE/DELETE operations always use primary - SELECT FOR UPDATE (pessimistic locking) uses primary ### Processing Configuration ```yaml processing: batch_size: 1000 # Number of rows per batch interval: 1s # Time between batches (e.g., 1s, 500ms, 2m) debug_mode: false # Log queries without executing updates pipeline_buffer: 1 # Buffer size for parallel SELECT/UPDATE pessimistic_locking: true # Use SELECT FOR UPDATE (default: true) lock_retry_count: 3 # Number of lock acquisition retries ``` ### Adapter Configuration ```yaml adapter: table_name: users # Target table (required) table_alias: u # Alias for main table (required when using joins) pk_columns: # Primary key column(s) (required) - user_id operation: update # "update" (default), "delete", or "null" update_sql: "status = 'processed'" # SET clause (required for update) before_sql: "..." # SQL to execute before operation (required for null) where_clause: "status = 'pending'" # Additional WHERE (optional) join_clause: "..." # JOIN statements (optional) order_by: "created_at" # Custom ORDER BY (optional, defaults to PK) ``` ### Interactive Control ```yaml interactive: enabled: true # Enable socket-based control socket_path: "/tmp/data-updater.sock" # Unix socket path ``` ### Status File (Automatic Resume) ```yaml status_file: enabled: true # Enable automatic resume path: "/var/lib/status" # Custom path (optional) ``` ## Advanced Features ### JOIN Support Use JOINs for complex queries that need to reference multiple tables: ```yaml adapter: table_name: items table_alias: i pk_columns: ["id"] operation: delete join_clause: | LEFT JOIN transaction_evidences te ON te.item_id = i.id where_clause: | i.status = 'cancel' AND te.id IS NULL ``` **How it works:** 1. SELECT query uses JOINs + WHERE to fetch PKs 2. DELETE/UPDATE query only uses primary keys (no JOINs) ### Before SQL (Pre-operation Hook) Execute SQL before each batch within the same transaction: ```yaml adapter: table_name: items pk_columns: ["id"] operation: delete before_sql: | INSERT INTO deleted_item_ids (id, created, deleted) SELECT id, created, NOW() FROM items WHERE id IN (?) where_clause: "status = 'cancel'" ``` **Notes:** - Use IN (?) placeholder - expanded to all PKs in the batch - For composite keys: (col1, col2) IN (?) - Executed atomica
こんにちは。メルカリ Engineering Office の mikichin です。 2月21日に開催された「Go Conference mini in Sendai 2026」にメルカリはZunda Sponsorをしておりました。今回は参加レポートをお届けします! 「Go Conference mini in Sendai 2026」について 「Go Conference mini in Sendai 2026」はプログラミング言語 Go に関する地域カンファレンスで、東北地方のエンジニアコミュニティの成長と連携を促進し、地域を超えて共に前進していくことを目指すイベントです。 開催概要 日時:2026年2月21日(土) 場所:アーバンネットビル仙台中央 カンファレンスルーム 公式サイト:https://sendaigo.jp/ メルカリメンバーの登壇 なんと、今回のカンファレンスでは4名の登壇がありました! 登壇後の感想とあわせて、登壇資料を共有します。 はじめての Go 〜 きっかけは TinyGo だった / @mikichin まさか Tech PR のわたしが、Go というプログラミング言語を中心としたカンファレンスに登壇する日が来るとは思っていませんでした。とても貴重な機会をいただき、ありがとうございました 🙌 今回の発表が同じように開発をしてみたいと思う誰かの後押しになったり、エンジニアのみなさんが質問をされたときに非エンジニアの人のわからないレベル感が参考になったりしたらうれしいです。 Goに新機能を提案し実装されるまでのフロー徹底解説 〜将来、あなたのアイデアがGoに入るかもしれない。/ @pooh 登壇した時にもお伝えしたのですが、「なぜプロポーザルを出すのか?」「ワクワクしない!?自分の提案、自分のコードがGoに取り込まれるのは楽しいよね!」というのがこの登壇の動機でした。 AI時代になり、自分でコードを書く機会が急激に減ってきています。 Goに貢献する、Goに自分の思いが入っているということを経験できる最後のタイミングかもしれません。 ぜひ!興味を持った人がトライする機会になればいいなと思いました。 仙台は牛タンが美味しかった! AI時代を見据えたコードカバレッジ計測ツールの開発 / @goccy 4年ぶりの仙台での登壇でした。出身地で登壇できるのは感慨深いもので、また地元を盛り上げるために良い発表ができるよう、頑張りたいと強く思いました。 仙台はグルメも最高だし泊まりで来ている場合は終電も気にしなくていいのが最高なはずなのですが、毎回帰省を兼ねていて終電前で実家に帰ってしまうので、次回は会場の近くでホテルをとって朝まで飲み明かすのも良いなと思いつつ、体力が心配。 TODO からはじまるコントリビュート 〜 TinyGo / @micchie 資料リンクはこちら 普段はカンファレンスの主催や運営に携わることが多いのですが、やはりスピーカーとして参加するのは格別です。この発表をきっかけに、5/30 に開催される Women Who Go Tokyo 10th Anniversary イベントの参加者も増えて、2度美味しいです。 ちなみにこの LT のネタはまだ未解決なので、次の技術書典までには修正してマージされたいところです。 会場内の様子 今回のイベントでは、スポンサーブースだけではなくKitchen Senoue(実行委員長 Senoueさんが振る舞う芋煮)や にがおえりんごさんがイラストを書いてくれるブースなどもありました! TinyGo Keeb のブースでは、「電車でTinyGO!」の展示があり遊んでみました。わたしは「電車でGO!」自体遊んだことがなかったので、初体験。全然うまく駅に停車できませんでしたw 「電車でTinyGO!」はプレステの電車でGO用に作られたコントローラーをBVEというPCゲームで遊べるように TinyGo でキーボード化したものとのことです。(参照元:Go Conference mini in Sendai 2026に参加しました) セッションだけではなく、いろいろ楽しめるコンテンツがたくさんありました! まとめ いままで、カンファレンススタッフやスポンサーブースの担当として参加することが多かったのですが、今回スピーカーとしての参加ということで、自分の発表がおわるまではそわそわしたり感想を直接いただいたりと、いつもとは違った経験ができてよかったです! また、今回4名の登壇者がいたので、各セッションの応援に行ったりと終始カンファレンスを楽しむことができました。 そして、なんといっても地方カンファレンスというのはいいですね!その土地ならではのごはんも堪能して大満足です。 最後に、「Go Conference mini in Sendai 2026」の企画運営、おつかれさま & ありがとうございました!また、次回を楽しみにしています!