/* --------------------------------------------------------------------------- * mcb -- simple memcacahed benchmark * version: 1.0RC2 * author: suzuki hironobu (hironobu@interdb.jp) 2008.Jun.25 0.9 * author: suzuki hironobu (hironobu@interdb.jp) 2009.Nov.4 1.0RC1 * author: suzuki hironobu (hironobu@interdb.jp) 2009.Dec.15 1.0RC2 * Copyright (C) 2008-2009 suzuki hironobu * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * --------------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #ifndef C_H #ifndef bool typedef char bool; #endif #ifndef true #define true ((bool) 1) #endif #ifndef false #define false ((bool) 0) #endif typedef bool *BoolPtr; #ifndef TRUE #define TRUE 1 #endif #ifndef FALSE #define FALSE 0 #endif #ifndef NULL #define NULL ((void *) 0) #endif #endif #define elog(_message_) do {fprintf(stderr, \ "%s():%s:%u: %s\n", \ __FUNCTION__, __FILE__, __LINE__, \ _message_); fflush(stderr);}while(0); #define SET_CMD 0x0001 #define ADD_CMD 0x0002 #define GET_CMD 0x0004 #define QUIT_CMD 0x0008 #define MAX_LINE 4028 #define MAX_ADDR_LEN 128 #define MAX_THREADS 256 #define TCP 0x0001 #define UDP 0x0002 #define UNIX_SOCKET 0x0004 typedef struct { char addr[MAX_ADDR_LEN]; int port; int thread_num; int command_num; int max_key; int command; int data_len; int verbose; int type; #ifdef _DEBUG_ int debug; #endif bool single_command; } sysval_t; struct stat_time { struct timeval begin; struct timeval end; }; typedef struct stat_time stat_data_t; /* memcached's commad format */ const static char set_cmd_fmt[] = "set %d %u %s %u\r\n%s\r\n"; const static char add_cmd_fmt[] = "add %d %u %s %u\r\n%s\r\n"; const static char get_cmd_fmt[] = "get %d\r\n"; const static char quit_cmd_fmt[] = "quit\r\n"; /* * declartion */ static void master_thread(void); static double get_interval(struct timeval, struct timeval); static int do_close(const int); static int do_connect(const char *, const int); static int build_mc_cmd(char *, const int, const int, const int, const char *, const size_t, const unsigned long int); static int do_cmd(const int, const char *, const int, const unsigned long int); static void connector_thread(void *); static void usage(void); static void init_sysval(void); /* * global variables */ static sysval_t sysval; static pthread_t *connector_tptr; static pthread_t tid; static stat_data_t *stat_data; static pthread_mutex_t begin_mtx; static pthread_cond_t begin_cond; static unsigned int begin_thread_num; static pthread_mutex_t end_mtx; static pthread_cond_t end_cond; static unsigned int end_thread_num; static struct timeval stat_data_begin; static struct timeval stat_data_end; /* * master */ static void master_thread(void) { int i; /* * wait for all threads end */ pthread_mutex_lock(&end_mtx); while (sysval.thread_num != end_thread_num) pthread_cond_wait(&end_cond, &end_mtx); pthread_mutex_unlock(&end_mtx); /* * display result */ double tmp_itvl; double min_itvl = 0x7fffffff; double ave_itvl = 0.0; double max_itvl = 0.0; long double itvl = 0.0; gettimeofday(&stat_data_end, NULL); for (i = 0; i < sysval.thread_num; i++) { tmp_itvl = get_interval(stat_data[i].begin, stat_data[i].end); itvl += tmp_itvl; if (max_itvl < tmp_itvl) max_itvl = tmp_itvl; if (tmp_itvl < min_itvl) min_itvl = tmp_itvl; if (1 <= sysval.verbose) fprintf(stdout, "thread(%d) end %f[sec]\n", i, tmp_itvl); } assert(0 < sysval.thread_num && sysval.thread_num <= MAX_THREADS); ave_itvl = (double) (itvl / sysval.thread_num); tmp_itvl = get_interval(stat_data_begin, stat_data_end); fprintf(stdout, "condition =>\n"); if (sysval.type == UNIX_SOCKET) fprintf(stdout, "\tconnect to localhost:(%s)\n", sysval.addr); else fprintf(stdout, "\tconnect to %s %s port %d\n", sysval.addr, (sysval.type == TCP ? "TCP" : "UDP"), sysval.port); if (sysval.command == SET_CMD) fprintf(stdout, "\tcommand = set\n"); else if (sysval.command == ADD_CMD) fprintf(stdout, "\tcommand = add\n"); else if (sysval.command == GET_CMD) fprintf(stdout, "\tcommand = get\n"); fprintf(stdout, "\t%d thread run\n", sysval.thread_num); fprintf(stdout, "\tsend %d command a thread, total %d command\n", sysval.command_num, sysval.command_num * sysval.thread_num); fprintf(stdout, "\tdata length = %d\n", sysval.data_len); fprintf(stdout, "result =>\n\tinterval = %f [sec]\n", tmp_itvl); if (tmp_itvl != 0.0) fprintf(stdout, "\tperformance = %f [command/sec]\n", (sysval.command_num * sysval.thread_num) / tmp_itvl); fprintf(stdout, "\tthread info:\n\t ave. = %f[sec], min = %f[sec], max = %f[sec]\n", ave_itvl, min_itvl, max_itvl); free(stat_data); } static double get_interval(struct timeval bt, struct timeval et) { double b, e; b = bt.tv_sec + (double) bt.tv_usec * 1e-6; e = et.tv_sec + (double) et.tv_usec * 1e-6; return e - b; } /* * connector */ static int do_close(const int fd) { return close(fd); } static int do_connect(const char *addr, const int port) { int fd, sockopt = 1; struct sockaddr_in serv_addr_in; struct sockaddr_un serv_addr_un; assert(addr != NULL); if (sysval.type == UNIX_SOCKET) { /* UNIX STREAM SOCKET */ bzero((char *) &serv_addr_un, sizeof(serv_addr_un)); serv_addr_un.sun_family = AF_LOCAL; strncpy(serv_addr_un.sun_path, addr, sizeof(serv_addr_un.sun_path) - 1); if ((fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) { elog("can't open unix stream socket"); exit(-1); } if (connect(fd, (struct sockaddr *) &serv_addr_un, sizeof(serv_addr_un)) < 0) { do_close(fd); fd = -1; } } else { /* TCP or UDP */ bzero((char *) &serv_addr_in, sizeof(serv_addr_in)); serv_addr_in.sin_family = AF_INET; serv_addr_in.sin_addr.s_addr = inet_addr(addr); serv_addr_in.sin_port = htons(port); if (sysval.type == TCP) { if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { elog("can't open stream socket"); exit(-1); } } else if (sysval.type == UDP) { if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { elog("can't open datagram socket"); exit(-1); } } setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)); if (connect(fd, (struct sockaddr *) &serv_addr_in, sizeof(serv_addr_in)) < 0) { do_close(fd); fd = -1; } } return fd; } static int build_mc_cmd(char *buff, const int buff_size, const int cmd_type, const int key, const char *val, const size_t val_len, const unsigned long int id) { int len = -1; int s; memset(buff, 0, buff_size); s = (sysval.type == UDP) ? 8 : 0; switch (cmd_type) { case SET_CMD: sprintf(&buff[s], set_cmd_fmt, key, (unsigned int) 0, "0", val_len, val); len = strlen(&buff[s]); break; case ADD_CMD: sprintf(&buff[s], add_cmd_fmt, key, (unsigned int) 0, "0", val_len, val); len = strlen(&buff[s]); break; case GET_CMD: sprintf(&buff[s], get_cmd_fmt, key); len = strlen(&buff[s]); break; case QUIT_CMD: sprintf(&buff[s], quit_cmd_fmt); len = strlen(&buff[s]); break; default: fprintf(stderr, "%s():%s:%u: %d command not supported\n", __FUNCTION__, __FILE__, __LINE__, cmd_type); abort(); } if (sysval.type == UDP) { buff[0] = (char) ((id & 0x0000ff00)>>8); buff[1] = (char) id & 0x000000ff; /* Request ID */ buff[2] = (char) 0; buff[3] = (char) 0; /* Sequence number */ buff[4] = (char) 0; buff[5] = (char) 1; /* Total number of datagrams: must be "01" */ buff[6] = (char) 0; buff[7] = (char) 0; /* Reserve must be 0 */ len += s; } return len; } static int do_cmd(const int fd, const char *cmd, const int cmd_len, const unsigned long int id) { int ret, len; struct timeval tv; fd_set fds; char result[MAX_LINE]; #ifdef _DEBUG_ if (2 <= sysval.debug) { if (sysval.type == UDP) fprintf(stdout, "CMD: %s\n", &cmd[8]); else fprintf(stdout, "CMD: %s\n", cmd); } #endif FD_ZERO(&fds); FD_SET(fd, &fds); tv.tv_sec = 0; tv.tv_usec = 0; /* send command to memcached */ if (send(fd, cmd, cmd_len, 0) != cmd_len) { elog("send error"); return -1; } /* recieve result */ len = -1; ret = select(fd + 1, &fds, NULL, NULL, NULL); if (ret == 0) { /* timeout */ elog("select timeout"); return 0; } if (FD_ISSET(fd, &fds)) { /* DO NOTHING */ do { if ((len = recv(fd, result, sizeof(result), 0)) < 0) { elog("read error"); return -1; } } while (len == MAX_LINE); } if (sysval.type == UDP) { if ((result[0] & 0x000000ff)!= ((id & 0x0000ff00)>>8) || (result[1] & 0x000000ff) != (id & 0x00000ff)) fprintf(stderr, "Error: UDP RequestID error\n"); } #ifdef _DEBUG_ if (2 <= sysval.debug) { if (sysval.type == UDP) fprintf(stdout, "RESULT: %s\n", &result[8]); else fprintf(stdout, "RESULT: %s\n", result); } #endif return len; } static void connector_thread(void *arg) { const int no = (int) arg; int fd; int i, j, len, str_len; unsigned long int id; char *buff, *data; assert(0 <= no && no < sysval.thread_num); if ((buff = calloc(1, sysval.data_len * 2 + 100)) == NULL) { elog("calloc error"); exit(-1); } if ((data = calloc(1, sysval.data_len * 2 + 1)) == NULL) { elog("calloc error"); exit(-1); } memset(data, 67, (size_t) sysval.data_len * 2); /* char'67' = 'C' */ data[sysval.data_len * 2] = '\0'; /* * increment begin_thread_num, * and wait for broadcast signal from last created thread */ if (sysval.thread_num != 1) { pthread_mutex_lock(&begin_mtx); begin_thread_num++; if (begin_thread_num == sysval.thread_num) pthread_cond_broadcast(&begin_cond); else { while (begin_thread_num < sysval.thread_num) pthread_cond_wait(&begin_cond, &begin_mtx); } pthread_mutex_unlock(&begin_mtx); } /* prepare a random seed, connection file descriptor */ gettimeofday(&stat_data[no].begin, NULL); srand((unsigned int) (stat_data[no].begin.tv_sec % RAND_MAX)); if (sysval.single_command == false || sysval.type == UDP) if ((fd = do_connect(sysval.addr, sysval.port)) == -1) { fprintf(stderr, "Error: can not connect to memcached\n"); if (sysval.type == UNIX_SOCKET) fprintf(stderr, "\tHint => Check permition of unix socket parh(%s).\n", sysval.addr); exit(-1); } /* * main loop */ id = sysval.command_num * (no + 1); for (i = 0; i < sysval.command_num; i++) { id++; if (sysval.single_command == true && sysval.type != UDP) if ((fd = do_connect(sysval.addr, sysval.port)) == -1) { fprintf(stderr, "Error: can not connect to memcached\n"); if (sysval.type == UNIX_SOCKET) { fprintf(stderr, "\tHint => Check permition of unix socket parh(%s).\n", sysval.addr); exit(-1); } } j = 1 + (int) ((double) sysval.max_key * rand() / (RAND_MAX + 1.0)); str_len = 1 + (int)((double)strlen(data) * rand() / (RAND_MAX + 1.0)); data[str_len] = '\0'; len = build_mc_cmd(buff, sizeof(buff), sysval.command, j, data, strlen(data), id); data[str_len] = 'a'; do_cmd(fd, buff, len, id); if (sysval.single_command == true && sysval.type != UDP) { len = build_mc_cmd(buff, sizeof(buff), QUIT_CMD, 0, NULL, 0, id); do_cmd(fd, buff, len, id); do_close(fd); } } if (sysval.single_command == false && sysval.type != UDP) { len = build_mc_cmd(buff, sizeof(buff), QUIT_CMD, 0, NULL, 0, id); do_cmd(fd, buff, len, id); do_close(fd); } /* * send signal */ gettimeofday(&stat_data[no].end, NULL); pthread_mutex_lock(&end_mtx); end_thread_num++; pthread_cond_signal(&end_cond); pthread_mutex_unlock(&end_mtx); free(buff); free(data); } static void usage(void) { fprintf(stderr, "mcb -- simple memcached benck mark\n"); fprintf(stderr, "usage: mcb -c {set|add|get} [OPTIONS]\n"); fprintf(stderr, "\t\t-c command_type {set|add|get}\n"); fprintf(stderr, "\t\t-a server_address[127.0.0.1]\n"); fprintf(stderr, "\t\t-f unix socket path\n"); fprintf(stderr, "\t\t-p server_port[11211]\n"); fprintf(stderr, "\t\t-T connection Type {TCP|UDP|UNIX_SOCKET}[TCP]\n"); fprintf(stderr, "\t\t-t number_of_thread[1]\n"); fprintf(stderr, "\t\t-n number_of_command[1]\n"); fprintf(stderr, "\t\t-m max_key[1000]\n"); fprintf(stderr, "\t\t-l average data_length[1024]\n"); fprintf(stderr, "\t\t-v :verbose\n"); #ifdef _DEBUG_ fprintf(stderr, "\t\t-d debug_level :debug[0-2]\n"); #endif fprintf(stderr, "\t\t-s :send and disconnect each command\n"); fprintf(stderr, "\t\t\t\t\t(Default => send all commands per one session)\n"); fprintf(stderr, "\t\t-h :help\n"); } static void init_sysval(void) { sysval.addr[0] = '\0'; sysval.port = 11211; sysval.thread_num = 1; sysval.command_num = 1; sysval.max_key = 1000; sysval.data_len = 1024; sysval.command = -1; sysval.verbose = 0; sysval.type = TCP; #ifdef _DEBUG_ sysval.debug = 0; #endif sysval.single_command = false; } int main(int argc, char **argv) { char c; int i, max_key; void *ret = NULL; /* * init */ begin_mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; begin_cond = (pthread_cond_t) PTHREAD_COND_INITIALIZER; end_mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; end_cond = (pthread_cond_t) PTHREAD_COND_INITIALIZER; begin_thread_num = 0; end_thread_num = 0; init_sysval(); #ifdef _DEBUG_ while ((c = getopt(argc, argv, "a:p:t:n:m:c:l:vVhsd:T:f:")) != -1) { #else while ((c = getopt(argc, argv, "a:p:t:n:m:c:l:vVhsT:f:")) != -1) { #endif switch (c) { case 'a': /* server address */ case 'f': /* unix socket path */ strcpy(sysval.addr, optarg); break; case 'p': /* server port */ sysval.port = strtol(optarg, NULL, 10); if (sysval.port <= 0) { fprintf(stderr, "Error: port number %d is not valid\n", sysval.port); exit(-1); } break; case 't': /* number of thread */ sysval.thread_num = strtol(optarg, NULL, 10); if (MAX_THREADS < sysval.thread_num) sysval.thread_num = MAX_THREADS; if (sysval.thread_num <= 0) { fprintf(stderr, "Error: thread number %d is not valid\n", sysval.thread_num); exit(-1); } break; case 'n': /* number of command */ sysval.command_num = strtol(optarg, NULL, 10); if (sysval.command_num <= 0) { fprintf(stderr, "Error: command number %d is not valid\n", sysval.command_num); exit(-1); } break; case 'm': /* max key value */ max_key = strtol(optarg, NULL, 10); if (max_key <= 0) { fprintf(stderr, "Error: max key %d is not valid. Set default.\n", max_key); } sysval.max_key = max_key; break; case 'c': /* command type */ if (strcmp(optarg, "set") == 0) sysval.command = SET_CMD; else if (strcmp(optarg, "add") == 0) sysval.command = ADD_CMD; else if (strcmp(optarg, "get") == 0) sysval.command = GET_CMD; else { usage(); exit(-1); } break; case 'l': /* data length */ sysval.data_len = strtol(optarg, NULL, 10); if (sysval.data_len <= 0) { fprintf(stderr, "Error: data_lenngth %d is not valid\n", sysval.data_len); exit(-1); } break; case 'v': sysval.verbose = 1; break; case 'V': sysval.verbose = 2; break; #ifdef _DEBUG_ case 'd': sysval.debug = strtol(optarg, NULL, 10); break; #endif case 's': sysval.single_command = true; break; case 'T': /* connection type */ if (strcmp(optarg, "TCP") == 0 || strcmp(optarg, "tcp") == 0) { sysval.type = TCP; } else if (strcmp(optarg, "UDP") == 0 || strcmp(optarg, "udp") == 0) { sysval.type = UDP; } else if (strcmp(optarg, "SOCKET") == 0 || strcmp(optarg, "socket") == 0 || strcmp(optarg, "UNIX_SOCKET") == 0 || strcmp(optarg, "unix_socket") == 0) { sysval.type = UNIX_SOCKET; } else { usage(); exit(-1); } break; case 'h': /* help */ usage(); exit(0); default: fprintf(stderr, "ERROR: option error: -%c is not valid\n", optopt); exit(-1); } } if (sysval.command == -1) { fprintf(stderr, "error: option -c not set\n"); usage(); exit(-1); } if (sysval.type == UNIX_SOCKET && sysval.addr[0] == '\0') { fprintf(stderr, "error: option -f not set\n"); usage(); exit(-1); } if (sysval.addr[0] == '\0') strcpy(sysval.addr, "127.0.0.1"); /* * main work */ if ((stat_data = calloc(sysval.thread_num, sizeof(stat_data_t))) == NULL) { elog("calloc error"); exit(-1); } if ((connector_tptr = calloc(sysval.thread_num, sizeof(pthread_t))) == NULL) { elog("calloc error"); exit(-1); } gettimeofday(&stat_data_begin, NULL); if (pthread_create(&tid, (void *) NULL, (void *) master_thread, (void *) NULL) != 0) { elog("pthread_create() error"); exit(-1); } for (i = 0; i < sysval.thread_num; i++) if (pthread_create (&connector_tptr[i], NULL, (void *) connector_thread, (void *) i) != 0) { elog("pthread_create() error"); exit(-1); } if (pthread_join(tid, &ret)) { elog("pthread_join() error"); exit(-1); } for (i = 0; i < sysval.thread_num; i++) if (pthread_join(connector_tptr[i], &ret)) { elog("pthread_join() error"); exit(-1); } pthread_cond_destroy(&begin_cond); pthread_cond_destroy(&end_cond); free(connector_tptr); return 0; } // EOF