仕組み

動作シーケンス

Streaming Replicationの動作シーケンスを簡単に示す。

Streaming Replicationの動作シーケンス

レプリケーションの開始まで

Streaming Replicationが起動してレプリケーションを行うまでを概説する。

  1. master起動
  2. masterサーバを起動する。slave側から接続要求が届くまでStreaming Replicationの機能は起動しない。

  3. slave起動
  4. slaveサーバを起動する。
    以降のシーケンスを見ればわかるが、slaveとmasterはどちらを先に起動してもよい。

  5. recoveryプロセス機能
  6. slaveはrevoveryプロセスを起動する。standbyモードで起動すればhot standbyで検索ができる。

  7. walreceiverプロセス起動
  8. slaveのwalreceiverプロセスが起動する。

  9. 接続要求
  10. slave側walreceiverがmasterに接続要求を行う。

  11. walsenderプロセス起動
  12. master側のwalsenderプロセスが起動する。

  13. ハンドシェイク
  14. walsender-walreceiver間でTCPコネクション確立後、walsenderとwalreceiverでハンドシェイクしてデータ交換する。 具体的にはスレーブ側が欲しいWALログの開始位置情報。

  15. walデータ送信
    • walsender
    • データの送信側walsenderは、(wal_sender_delayに設定された)200m秒毎に接続状況を確認しつつ、 walデータが存在すればスレーブ側に送信する。

    • walreceiver
    • データの受信側walreceiverは、基本的に受信待ち(selectかpoll)しながら、データを受け取ったら walデータを書き込む。

切断

masterかslaveがshutdownしようとすると、walsenderとwalreceiver間のコネクションを切断する。 これをトリガーに双方ともプロセス終了へと向かう。
これ以外にも、プロセスが異常終了した場合の終了シーケンスがいくつかあるが、後で詳説する。

ソース

ざっくりソースを読んでみる。

井久保さんのPostgreSQLのソースコード解析資料、 特に postmaster[第4回しくみ分科会勉強会資料]postgresプロセスの概要で、 プロセスの生成と終了についてを読むと、以降の説明が理解しやすい。
資料はver7.3〜ver7.4くらいを対象としているので多少古いが、アウトラインを押さえるには十分である。

以下の説明では明示的ではないが、walsenderとwalreceiverはPostgreSQLのプロセスとしてはかなり違う。
walsenderはユーザがアクセスするバックエンドプロセスと同種のプロセスで、起動/終了シーケンスも同じである。
一方、walreceiverは統計処理やautovacuum、wal writerのようにバックグランドでずっと稼働しつづけるプロセスの一種で、 起動や終了の判断が厳しいし、終了処理自体の作法も異なる。

構成

Streaming Replication関連のソースは~/src/backend/replication/以下。

replication/:
Makefile
README
libpqwalreceiver
walreceiver.c               # スレーブ側プロセス
walreceiverfuncs.c               # スレーブ側プロセスに作用する関数群。他プロセスから呼ばれる。
walsender.c               # マスター側プロセス

replication/libpqwalreceiver/:
Makefile
libpqwalreceiver.c               # スレーブ側通信ライブラリ(スレーブ->マスタ通信)
インクルードファイルは~/src/include/replication/以下:
walreceiver.h
walsender.h

初期化部分

walsender

サーバ起動時には共有メモリに管理領域を確保するだけである。
メモリサイズはWalSndShmemSize(void)@walsender.c、 メモリ領域確保はWalSndShmemInit(void)@walsender.cで行う。

/* Report shared-memory space needed by WalSndShmemInit */
Size
WalSndShmemSize(void)
{
        Size            size = 0;

        size = offsetof(WalSndCtlData, walsnds);
        size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));

        return size;
}

/* Allocate and initialize walsender-related shared memory */
void
WalSndShmemInit(void)

....

確保するデータ領域の構造は以下。~/src/include/replication/walsender.h。

/*
 * Each walsender has a WalSnd struct in shared memory.
 */
typedef struct WalSnd
{
        pid_t           pid;          /* this walsender's process id, or 0 */
        XLogRecPtr      sentPtr;      /* WAL has been sent up to this point */

        slock_t         mutex;        /* locks shared variables shown above */
} WalSnd;

/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
        WalSnd          walsnds[1];   /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;

extern WalSndCtlData *WalSndCtl;

このデータ領域は複数のwalsenderや他のプロセスとの情報共有用である。 確保するのは、walsender毎にwalsenderのプロセスIDと送信済のWALログの位置だけ (共有データなので排他制御用のmutexがある)。
WalSndCtlには、postgresql.confのmax_wal_sendersに設定した数だけWalSndが確保される。

どこから接続されたスレーブなのか、接続情報(conninfo)は各walsenderプロセスが独立して持っている。 他のプロセスからは各々の接続情報(conninfo)を取得できないので、SELECT文でスレーブのリストを表示するなどできない。 スレーブの接続情報やwalsenderの稼働状態はpsコマンドで確認するかログをみるしかない。今後の改良に期待したい。


walsenderは、スレーブ(walreceiver)から接続要求によってプロセスが生成され、 スレーブとのTCPコネクション確立 -> ネゴシエーション -> WALデータ送信となる。

実体はWalsenderMain(void)@walsender.cである。

/* Main entry point for walsender process */
int
WalSenderMain(void)
{
        MemoryContext walsnd_context;

        if (RecoveryInProgress())
                ereport(FATAL,
                                (errcode(ERRCODE_CANNOT_CONNECT_NOW),
                                 errmsg("recovery is still in progress, can't accept WAL streaming connections")));

        /* Create a per-walsender data structure in shared memory */
        InitWalSnd();

        walsnd_context = AllocSetContextCreate(TopMemoryContext,
                                                                                   "Wal Sender",
                                                                                   ALLOCSET_DEFAULT_MINSIZE,
                                                                                   ALLOCSET_DEFAULT_INITSIZE,
                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
        MemoryContextSwitchTo(walsnd_context);

        /* Unblock signals (they were blocked when the postmaster forked us) */
        PG_SETMASK(&UnBlockSig);

        /* Tell the standby that walsender is ready for receiving commands */
        ReadyForQuery(DestRemote);

        /* Handle handshake messages before streaming */
        WalSndHandshake();

        /* Main loop of walsender */
        return WalSndLoop();
}

ざっくり説明すると:

  1. 処理用のメモリコンテキスト(walsnd_context)作成
  2. 初期化(InitWalSnd())
  3. シグナルのマスク処理(PG_SETMASK())
  4. 終了処理など設定。具体的にはSIGTERMなど受け取った場合の処理。終了シーケンスの説明で解説予定。
  5. スレーブ(walreceiver)とのネゴシエーション[ReadyForQuery()+WalSndHandshake()]
  6. コネクション確立後、スレーブ側からWALデータの送信開始位置を受信。
    コネクション自体はPostgreSQL標準のプロトコルで行う。 スレーブ側が送ってくる情報は:
    "IDENTIFY_SYSTEM"というメッセージと、"START_REPLICATION x/yyyyyy"というスレーブ側が送信してほしいWALデータの開始位置。 得たデータはWalSndのsentPtrに書き込まれる。
  7. スレーブとの通信開始
  8. WalSndLoop()
walreceiver

サーバ起動時、共有メモリに管理領域を確保して、walreceiverプロセスを起動する。

共有メモリに確保するデータ構造は以下のとおり:

/*
 * Values for WalRcv->walRcvState.
 */
typedef enum
{
        WALRCV_STOPPED,                         /* stopped and mustn't start up again */
        WALRCV_STARTING,                        /* launched, but the process hasn't
                                                                 * initialized yet */
        WALRCV_RUNNING,                         /* walreceiver is running */
        WALRCV_STOPPING                         /* requested to stop, but still running */
} WalRcvState;


/* Shared memory area for management of walreceiver process */
typedef struct
{
        char            conninfo[MAXCONNINFO];

        pid_t           pid;
        WalRcvState walRcvState;
        pg_time_t       startTime;

        XLogRecPtr      receivedUpto;

        slock_t         mutex;                  /* locks shared variables shown above */
} WalRcvData;

extern WalRcvData *WalRcv;

  1. conninfo
  2. マスターへの接続情報。hostとportの対。
  3. pid
  4. walreceiverのプロセスID
  5. walRcvState
  6. walreceiverの状態。常にtypedef enum WalRcvStateで定義された状態のどれかをとる。
  7. startTime
  8. レプリケーションの起動した時間
  9. receivedUpto
  10. 次に受け取るべきWALの位置。"receivedUpto - 1"は既に受け取ったWALの位置。
  11. mutex
  12. 排他制御用。walreceiverはスレーブに1つしかないが、他プロセスがwalRcvStateを変更したりreceivedUptoを読んだりするので。


実体はWalReceiverMain(void)@walreceiver.cである。かなり長いので疑似コード化する。

/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{

	/*
	 *  walrcv->walRcvState が"WALRCV_STARTING"以外エラー(proc_exit(1)かelog(PANIC))
	 */

	/*
	 * walrcvにデータセット
	 */

	/*
	 * walrcvからデータを取得
	 */	

	/*
	 * 終了関連の設定
	 */


	/*
	 * マスターサーバのwalsenderとの通信開始
	 */

        /* 
	 * メインループ
	 */
        for (;;)
        {
		/*
		 * データ待ち && データ取得
		 */
        }
}
  1. walrcvにデータセット
  2. プロセスIDをセットし、状態をWALRCV_STARTINGからWALRCV_RUNNINGへ

            walrcv->pid = MyProcPid;
            walrcv->walRcvState = WALRCV_RUNNING;
    
  3. walrcvからデータを取得
  4. 接続情報conninfoとWALログのスタート地点startpointを取得

            strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
            startpoint = walrcv->receivedUpto;
    
  5. 終了関連の設定
  6. 詳細は終了処理にて。

  7. マスターサーバのwalsenderとの通信開始
  8. マスターサーバとの通信はliblibpqwalreceiver/libpqwalreceiver.cで定義した関数が実行する。 walrcv_connect()の実体はlibpqrcv_connect()@libpqwalreceiver.cである。

            EnableWalRcvImmediateExit();
            walrcv_connect(conninfo, startpoint);
            DisableWalRcvImmediateExit();
    

    EnableWalRcvImmediateExit()@walreceiver.cと DisableWalRcvImmediateExit()@walreceiver.cは 接続中にスレーブサーバの停止コマンドが届いた場合の対処。詳細は終了処理にて。

通常処理

WALデータの受渡しの概略。

walsender

マスター側の正体:WalSndLoop()@walsender.cを概説。こちらも疑似コード。

/* Main loop of walsender process */
static int
WalSndLoop(void)
{
        /* Loop forever */
        for (;;)
        {
	       /*
	        * 終了すべきかどうか判断
		*/

		/*
		 * (デフォルトで)200msecの待ち。
		 */
                remain = WalSndDelay * 1000L;
                while (remain > 0)
                {
                        if (got_SIGHUP || shutdown_requested || ready_to_stop)
                                break;
                        pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
                        CheckClosedConnection();

                        remain -= NAPTIME_PER_CYCLE;
                }

		/*
		 * (もしあれば)WALデータを送信
		 */
                if (!XLogSend(&output_message))
                        goto eof;
        }

eof:
        if (whereToSendOutput == DestRemote)
                whereToSendOutput = DestNone;
        proc_exit(0);
        return 1;                                       /* keep the compiler quiet */
}
  1. 終了すべきかどうか判断
  2. Postmasterプロセスが終了している、SIGHUPを受け取った、SIGUSR2を受け取った、 shutdown_requestedがfalseの場合、プロセス終了に向かう。
    終了の詳細は後述。

                   if (!PostmasterIsAlive(true))
                            exit(1);
                    if (got_SIGHUP)
                    {       got_SIGHUP = false;
                            ProcessConfigFile(PGC_SIGHUP);
                    }
    
                    /*                                                                                                 
                     * When SIGUSR2 arrives, we send all outstanding logs up to the                                    
                     * shutdown checkpoint record (i.e., the latest record) and exit.                                  
                     */
                    if (ready_to_stop)
                    {
                            XLogSend(&output_message);
                            shutdown_requested = true;
                    }
    
                    /* Normal exit from the walsender is here */
                    if (shutdown_requested)
                    {
                            /* Inform the standby that XLOG streaming was done */
                            pq_puttextmessage('C', "COPY 0");
                            pq_flush();
                            proc_exit(0);
                    }
    
  3. (デフォルトで)200msecの待ち
  4. while()ループ内でスレーブとのコネクションが切れていないか確認しながら(CheckClosedConnection())、 WalSndDelayで定義した時間(200msec)待つ。

  5. WALデータ送信
  6. XLogSend()でWALデータを送信。

9.0beta1のソース、宣言部では"static void XLogRead()"と宣言されているが、定義部ではstaticが抜けている(ケアレスミス?)。

walreceiver

スレーブ側のメインループを疑似コードで再掲。

        /* Loop until end-of-streaming or error */
        for (;;)
        {
	       /*
		* 終了すべきかどうか判断
		*/


		/*
		 * データの待ち受け
		 */
                if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
                {
			/*
			 * データを受信した
			 */
			XLogWalRcvProcessMsg(type, buf, len);

			/*
			 * 一度に受信しきれない場合、受信が終わるまで繰り返し受け取る
			 */
                        while (walrcv_receive(0, &type, &buf, &len))
                                XLogWalRcvProcessMsg(type, buf, len);

			/*
			 * 受け取ったデータを書き込む
			 */	
                        XLogWalRcvFlush();
                }
        }
  1. 終了すべきかどうか判断
  2. Postmasterプロセスが終了している、(triggerファイルを見つけたりして)すでにリカバリー状態でない場合、プロセスは終了。 終了処理の詳細は後述。

                    if (!PostmasterIsAlive(true))
                            exit(1);
                    if (!RecoveryInProgress())
                            ereport(FATAL,
                                            (errmsg("cannot continue WAL streaming, recovery has already ended")));
    

                    /* Process any requests or signals received recently */
                    ProcessWalRcvInterrupts();
    
  3. データの待ち受け
  4. データの待ち受けはwalrcv_receive()で行う。実体はlibpqrcv_receive@libpqwalreceiver.cで、 システムコールのpoll()かselect()で待ち受けするオーソドックスな作り。

  5. データの受信と書き込み
  6. XLogWalRcvProcessMsg()@walreceiver.cで受け取り、XLogWalRcvFlush()@walreceiver.cでディスクに書き込む。

終了処理

walsender

  • 異常終了
    • スレーブとのコネクションが切断
    • 周期的(100msec毎)にCheckClosedConnection()@walsender.cで逐次コネクションを確認しているが、接続が切断するとproc_exit(0)で終了する。

      スレーブとのネゴシエーション中(WalHandshake()実行中)にエラーが発生しても、同様にproc_exit(0)で終了する。

    • Postmasterプロセスが終了
    • 周期的(200msec毎)にPostmasterプロセスの死活監視し、終了していた場合はexit(1)で終了する。

    • XLogSend()でデータ転送失敗
    • XLogSend()でWALデータの転送が失敗した場合、proc_exit(0)で終了する。

  • シグナル受信
  • walsenderプロセスが生成されるとき、WalSndSignals()@walsender.cで各種シグナル処理が設定される。
    SIGHUPについては説明を省略する。

    • SIGTERM
    • SIGTERMを受け取ると(shutdown smart)、WalSndShutdownHandler()@walsender.cが起動して"shutdown_requested = true"、WalSndLoop()のfor(;;)ループ内proc_exit(0)で終了。

    • SIGUSR2
    • SIGUSR2を受け取ると、WalSndLastCycleHandler()@walsender.cが起動して"ready_to_stop = true"、WalSndLoop()のfor(;;)ループ内proc_exit(0)で終了。

    • SIGQUIT
    • SIGQUITを受け取ると(shutdown immediate)、WalSndQuickDieHandler()@walsender.cが起動、exit(2)を呼び出して終了する。

なお、on_shmem_exit()に登録されたハンドラWalSndKill()@walsender.cには共有メモリ上に確保したデータ構造WalSndの内容をクリアする(開放freeではない)。

walreceiver

walreceiverの終了処理は少し複雑である。

  • master側が終了した場合
  • walreceiverプロセスは、walwriterやautovacuumとおなじくバックエンドプロセスなので、 起動状態が監視(1秒毎)されていて、なんらかのエラーで終了した場合は再起動する。

    1. エラー発生
    2. libpqrcv_receive()@libpqwalreceiver.cでエラー発生、on_shmem_exit()で登録されたハンドラWalRcvDie()@walreceiver.cが起動

    3. walreceiverプロセス終了
    4. WalRcvDie()は、共有メモリ上のデータ領域walRcvの内容をクリア(freeではない)し、プロセス終了

    5. walreceiverプロセスの再起動
    6. 以降、1秒毎にwalreceiverプロセスを生成し、masterと接続できない場合は上記と同じく生成してはWalRcvDie()で終了を繰り返す。 接続できた場合は通常のレプリケーションを実行

  • walreceiverが異常終了
    • 通信エラー、WALデータの書き込みエラー
    •  なんらかの致命的なエラーが発生した場合は上記master終了と同じく、プロセス終了 -> WalRcvDie() -> walreceiver再起動、を繰り返す?

    • Postmasterプロセスが終了
    • メインループが回る度にPostmasterプロセスの死活検査(PostmasterIsAlive())を行い、終了している場合はexit(1)で終了する。

  • シグナル受信
  • walreceiverプロセス生成時、SIGTERM,SIGQUIT,SIGHUPについてハンドラーが設定される。
    なお、SIGALRM,SIGPIPE,SIGUSR1,SIGUSR2は無視、これら以外はデフォルト動作である。

    • SIGTERM
    • SIGTERMを受け取ると(shutdown smart)、WalRcvShutdownHandler()@walreceiver.cが起動して、got_SIGTERMをtrueにし、条件によってProcessWalRcvInterrupts()を実行。終了処理は行わない。

      /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
      static void
      WalRcvShutdownHandler(SIGNAL_ARGS)
      {
              got_SIGTERM = true;
      
              /* Don't joggle the elbow of proc_exit */
              if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
                      ProcessWalRcvInterrupts();
      }
      
    • SIGQUIT
    • SIGQUITを受け取ると(shutdown immediate)、WalRcvQuickDieHandler()@walreceiver.cが起動して、exit(2)で終了。

    SIGHUPについては説明を省略する。

  • triggerファイルを発見した場合
  • この機能はStreaming Replication由来ではない。recoveryを行ってるプロセスが CheckForStandbyTrigger()@xlog.cを周期的に実行することで実現している。

    static bool
    CheckForStandbyTrigger(void)
    {
            struct stat stat_buf;
    
            if (TriggerFile == NULL)
                    return false;
    
            if (stat(TriggerFile, &stat_buf) == 0)
            {
                    ereport(LOG,
                                    (errmsg("trigger file found: %s", TriggerFile)));
                    ShutdownWalRcv();
                    unlink(TriggerFile);
                    return true;
            }
            return false;
    }
    
    1. triggerファイル発見
    2. CheckForStandbyTrigger()@xlog.cが周期的に実行、triggerファイルを発見したら、ShutdownWalRcv()@walreceiverfunc.cを実行する。

    3. ShutdownWalRcv()実行
    4.  ShutdownWalRcv()の疑似コードを示す。walRcvStateの変更後、walreceiverプロセスにSIGTERMを送っている。

      void
      ShutdownWalRcv(void)
      {
              SpinLockAcquire(&walrcv->mutex);
      	/*
      	 * walRcvStateの変更:
      	 *	WALRCV_STARTING -> WALRCV_STOPPED
      	 *	WALRCV_RUNNING  -> WALRCV_STOPPING
      	 */ 
              SpinLockRelease(&walrcv->mutex);
      
      	/*
      	 * walreceiverにSIGTERMを送る。
      	 */
              if (walrcvpid != 0)
                      kill(walrcvpid, SIGTERM);
      
              /*                                                                                 
               * Wait for walreceiver to acknowledge its death by setting state to               
               * WALRCV_STOPPED.                                                                 
               */
              while (WalRcvInProgress())
              {
                      HandleStartupProcInterrupts();
                      pg_usleep(100000);              /* 100ms */
              }
      }
      

walreceiverについて、walRcvStateや内部変数の複雑な状態遷移はあえて説明を控えた。
次の検証パートでこれらについて重点的に調査する予定である。


Last-modified: 2010-5-24