|
|
仕組み動作シーケンスStreaming Replicationの動作シーケンスを簡単に示す。
レプリケーションの開始までStreaming Replicationが起動してレプリケーションを行うまでを概説する。
masterサーバを起動する。slave側から接続要求が届くまでStreaming Replicationの機能は起動しない。
slaveサーバを起動する。
slaveはrevoveryプロセスを起動する。standbyモードで起動すればhot standbyで検索ができる。 slaveのwalreceiverプロセスが起動する。 slave側walreceiverがmasterに接続要求を行う。 master側のwalsenderプロセスが起動する。 walsender-walreceiver間でTCPコネクション確立後、walsenderとwalreceiverでハンドシェイクしてデータ交換する。 具体的にはスレーブ側が欲しいWALログの開始位置情報。 データの送信側walsenderは、(wal_sender_delayに設定された)200m秒毎に接続状況を確認しつつ、 walデータが存在すればスレーブ側に送信する。 データの受信側walreceiverは、基本的に受信待ち(selectかpoll)しながら、データを受け取ったら walデータを書き込む。 切断
masterかslaveがshutdownしようとすると、walsenderとwalreceiver間のコネクションを切断する。
これをトリガーに双方ともプロセス終了へと向かう。
ソースざっくりソースを読んでみる。
井久保さんのPostgreSQLのソースコード解析資料、
特に
postmaster[第4回しくみ分科会勉強会資料]、
postgresプロセスの概要で、
プロセスの生成と終了についてを読むと、以降の説明が理解しやすい。
以下の説明では明示的ではないが、walsenderとwalreceiverはPostgreSQLのプロセスとしてはかなり違う。
構成Streaming Replication関連のソースは~/src/backend/replication/以下。 replication/: Makefile README libpqwalreceiver walreceiver.c # スレーブ側プロセス walreceiverfuncs.c # スレーブ側プロセスに作用する関数群。他プロセスから呼ばれる。 walsender.c # マスター側プロセス replication/libpqwalreceiver/: Makefile libpqwalreceiver.c # スレーブ側通信ライブラリ(スレーブ->マスタ通信)インクルードファイルは~/src/include/replication/以下: walreceiver.h walsender.h 初期化部分walsender
サーバ起動時には共有メモリに管理領域を確保するだけである。
/* Report shared-memory space needed by WalSndShmemInit */
Size
WalSndShmemSize(void)
{
Size size = 0;
size = offsetof(WalSndCtlData, walsnds);
size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
return size;
}
/* Allocate and initialize walsender-related shared memory */
void
WalSndShmemInit(void)
....
確保するデータ領域の構造は以下。~/src/include/replication/walsender.h。
/*
* Each walsender has a WalSnd struct in shared memory.
*/
typedef struct WalSnd
{
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
extern WalSndCtlData *WalSndCtl;
このデータ領域は複数のwalsenderや他のプロセスとの情報共有用である。
確保するのは、walsender毎にwalsenderのプロセスIDと送信済のWALログの位置だけ
(共有データなので排他制御用のmutexがある)。
どこから接続されたスレーブなのか、接続情報(conninfo)は各walsenderプロセスが独立して持っている。 他のプロセスからは各々の接続情報(conninfo)を取得できないので、SELECT文でスレーブのリストを表示するなどできない。 スレーブの接続情報やwalsenderの稼働状態はpsコマンドで確認するかログをみるしかない。今後の改良に期待したい。 walsenderは、スレーブ(walreceiver)から接続要求によってプロセスが生成され、 スレーブとのTCPコネクション確立 -> ネゴシエーション -> WALデータ送信となる。 実体はWalsenderMain(void)@walsender.cである。
/* Main entry point for walsender process */
int
WalSenderMain(void)
{
MemoryContext walsnd_context;
if (RecoveryInProgress())
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("recovery is still in progress, can't accept WAL streaming connections")));
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
walsnd_context = AllocSetContextCreate(TopMemoryContext,
"Wal Sender",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(walsnd_context);
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
/* Handle handshake messages before streaming */
WalSndHandshake();
/* Main loop of walsender */
return WalSndLoop();
}
ざっくり説明すると:
コネクション自体はPostgreSQL標準のプロトコルで行う。 スレーブ側が送ってくる情報は: "IDENTIFY_SYSTEM"というメッセージと、"START_REPLICATION x/yyyyyy"というスレーブ側が送信してほしいWALデータの開始位置。 得たデータはWalSndのsentPtrに書き込まれる。 walreceiverサーバ起動時、共有メモリに管理領域を確保して、walreceiverプロセスを起動する。 共有メモリに確保するデータ構造は以下のとおり:
/*
* Values for WalRcv->walRcvState.
*/
typedef enum
{
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_RUNNING, /* walreceiver is running */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
/* Shared memory area for management of walreceiver process */
typedef struct
{
char conninfo[MAXCONNINFO];
pid_t pid;
WalRcvState walRcvState;
pg_time_t startTime;
XLogRecPtr receivedUpto;
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
実体はWalReceiverMain(void)@walreceiver.cである。かなり長いので疑似コード化する。
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
/*
* walrcv->walRcvState が"WALRCV_STARTING"以外エラー(proc_exit(1)かelog(PANIC))
*/
/*
* walrcvにデータセット
*/
/*
* walrcvからデータを取得
*/
/*
* 終了関連の設定
*/
/*
* マスターサーバのwalsenderとの通信開始
*/
/*
* メインループ
*/
for (;;)
{
/*
* データ待ち && データ取得
*/
}
}
プロセスIDをセットし、状態をWALRCV_STARTINGからWALRCV_RUNNINGへ
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_RUNNING;
接続情報conninfoとWALログのスタート地点startpointを取得
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receivedUpto;
詳細は終了処理にて。 マスターサーバとの通信はliblibpqwalreceiver/libpqwalreceiver.cで定義した関数が実行する。 walrcv_connect()の実体はlibpqrcv_connect()@libpqwalreceiver.cである。
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
EnableWalRcvImmediateExit()@walreceiver.cと DisableWalRcvImmediateExit()@walreceiver.cは 接続中にスレーブサーバの停止コマンドが届いた場合の対処。詳細は終了処理にて。 通常処理WALデータの受渡しの概略。 walsenderマスター側の正体:WalSndLoop()@walsender.cを概説。こちらも疑似コード。
/* Main loop of walsender process */
static int
WalSndLoop(void)
{
/* Loop forever */
for (;;)
{
/*
* 終了すべきかどうか判断
*/
/*
* (デフォルトで)200msecの待ち。
*/
remain = WalSndDelay * 1000L;
while (remain > 0)
{
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE;
}
/*
* (もしあれば)WALデータを送信
*/
if (!XLogSend(&output_message))
goto eof;
}
eof:
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;
proc_exit(0);
return 1; /* keep the compiler quiet */
}
Postmasterプロセスが終了している、SIGHUPを受け取った、SIGUSR2を受け取った、
shutdown_requestedがfalseの場合、プロセス終了に向かう。
if (!PostmasterIsAlive(true))
exit(1);
if (got_SIGHUP)
{ got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* When SIGUSR2 arrives, we send all outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
*/
if (ready_to_stop)
{
XLogSend(&output_message);
shutdown_requested = true;
}
/* Normal exit from the walsender is here */
if (shutdown_requested)
{
/* Inform the standby that XLOG streaming was done */
pq_puttextmessage('C', "COPY 0");
pq_flush();
proc_exit(0);
}
while()ループ内でスレーブとのコネクションが切れていないか確認しながら(CheckClosedConnection())、 WalSndDelayで定義した時間(200msec)待つ。 XLogSend()でWALデータを送信。 9.0beta1のソース、宣言部では"static void XLogRead()"と宣言されているが、定義部ではstaticが抜けている(ケアレスミス?)。 walreceiverスレーブ側のメインループを疑似コードで再掲。
/* Loop until end-of-streaming or error */
for (;;)
{
/*
* 終了すべきかどうか判断
*/
/*
* データの待ち受け
*/
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
/*
* データを受信した
*/
XLogWalRcvProcessMsg(type, buf, len);
/*
* 一度に受信しきれない場合、受信が終わるまで繰り返し受け取る
*/
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
/*
* 受け取ったデータを書き込む
*/
XLogWalRcvFlush();
}
}
Postmasterプロセスが終了している、(triggerファイルを見つけたりして)すでにリカバリー状態でない場合、プロセスは終了。 終了処理の詳細は後述。
if (!PostmasterIsAlive(true))
exit(1);
if (!RecoveryInProgress())
ereport(FATAL,
(errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
データの待ち受けはwalrcv_receive()で行う。実体はlibpqrcv_receive@libpqwalreceiver.cで、
システムコールのpoll()かselect()で待ち受けするオーソドックスな作り。
XLogWalRcvProcessMsg()@walreceiver.cで受け取り、XLogWalRcvFlush()@walreceiver.cでディスクに書き込む。 終了処理walsender
周期的(100msec毎)にCheckClosedConnection()@walsender.cで逐次コネクションを確認しているが、接続が切断するとproc_exit(0)で終了する。 スレーブとのネゴシエーション中(WalHandshake()実行中)にエラーが発生しても、同様にproc_exit(0)で終了する。 周期的(200msec毎)にPostmasterプロセスの死活監視し、終了していた場合はexit(1)で終了する。 XLogSend()でWALデータの転送が失敗した場合、proc_exit(0)で終了する。
walsenderプロセスが生成されるとき、WalSndSignals()@walsender.cで各種シグナル処理が設定される。
SIGTERMを受け取ると(shutdown smart)、WalSndShutdownHandler()@walsender.cが起動して"shutdown_requested = true"、WalSndLoop()のfor(;;)ループ内proc_exit(0)で終了。 SIGUSR2を受け取ると、WalSndLastCycleHandler()@walsender.cが起動して"ready_to_stop = true"、WalSndLoop()のfor(;;)ループ内proc_exit(0)で終了。 SIGQUITを受け取ると(shutdown immediate)、WalSndQuickDieHandler()@walsender.cが起動、exit(2)を呼び出して終了する。 なお、on_shmem_exit()に登録されたハンドラWalSndKill()@walsender.cには共有メモリ上に確保したデータ構造WalSndの内容をクリアする(開放freeではない)。 walreceiverwalreceiverの終了処理は少し複雑である。
walreceiverプロセスは、walwriterやautovacuumとおなじくバックエンドプロセスなので、 起動状態が監視(1秒毎)されていて、なんらかのエラーで終了した場合は再起動する。 libpqrcv_receive()@libpqwalreceiver.cでエラー発生、on_shmem_exit()で登録されたハンドラWalRcvDie()@walreceiver.cが起動 WalRcvDie()は、共有メモリ上のデータ領域walRcvの内容をクリア(freeではない)し、プロセス終了 以降、1秒毎にwalreceiverプロセスを生成し、masterと接続できない場合は上記と同じく生成してはWalRcvDie()で終了を繰り返す。 接続できた場合は通常のレプリケーションを実行
なんらかの致命的なエラーが発生した場合は上記master終了と同じく、プロセス終了 -> WalRcvDie() -> walreceiver再起動、を繰り返す?。 メインループが回る度にPostmasterプロセスの死活検査(PostmasterIsAlive())を行い、終了している場合はexit(1)で終了する。
walreceiverプロセス生成時、SIGTERM,SIGQUIT,SIGHUPについてハンドラーが設定される。
SIGTERMを受け取ると(shutdown smart)、WalRcvShutdownHandler()@walreceiver.cが起動して、got_SIGTERMをtrueにし、条件によってProcessWalRcvInterrupts()を実行。終了処理は行わない。
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
static void
WalRcvShutdownHandler(SIGNAL_ARGS)
{
got_SIGTERM = true;
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts();
}
SIGQUITを受け取ると(shutdown immediate)、WalRcvQuickDieHandler()@walreceiver.cが起動して、exit(2)で終了。 SIGHUPについては説明を省略する。 この機能はStreaming Replication由来ではない。recoveryを行ってるプロセスが CheckForStandbyTrigger()@xlog.cを周期的に実行することで実現している。
static bool
CheckForStandbyTrigger(void)
{
struct stat stat_buf;
if (TriggerFile == NULL)
return false;
if (stat(TriggerFile, &stat_buf) == 0)
{
ereport(LOG,
(errmsg("trigger file found: %s", TriggerFile)));
ShutdownWalRcv();
unlink(TriggerFile);
return true;
}
return false;
}
CheckForStandbyTrigger()@xlog.cが周期的に実行、triggerファイルを発見したら、ShutdownWalRcv()@walreceiverfunc.cを実行する。 ShutdownWalRcv()の疑似コードを示す。walRcvStateの変更後、walreceiverプロセスにSIGTERMを送っている。
void
ShutdownWalRcv(void)
{
SpinLockAcquire(&walrcv->mutex);
/*
* walRcvStateの変更:
* WALRCV_STARTING -> WALRCV_STOPPED
* WALRCV_RUNNING -> WALRCV_STOPPING
*/
SpinLockRelease(&walrcv->mutex);
/*
* walreceiverにSIGTERMを送る。
*/
if (walrcvpid != 0)
kill(walrcvpid, SIGTERM);
/*
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
while (WalRcvInProgress())
{
HandleStartupProcInterrupts();
pg_usleep(100000); /* 100ms */
}
}
walreceiverについて、walRcvStateや内部変数の複雑な状態遷移はあえて説明を控えた。
Last-modified: 2010-5-24
|