/* IBM_PROLOG_BEGIN_TAG */ /* This is an automatically generated prolog. */ /* */ /* */ /* */ /* Licensed Materials - Property of IBM */ /* */ /* (C) COPYRIGHT International Business Machines Corp. 1999,2006 */ /* All Rights Reserved */ /* */ /* US Government Users Restricted Rights - Use, duplication or */ /* disclosure restricted by GSA ADP Schedule Contract with IBM Corp. */ /* */ /* IBM_PROLOG_END_TAG */ /* Multinode GPFS performance test program. Measures aggregate data transfer rate across several nodes for random, strided, or sequential access patterns. */ #ifdef GPFS_LINUX /* Use 64 bit version of stat, etc. */ #define _LARGEFILE_SOURCE #define _LARGEFILE64_SOURCE #define _FILE_OFFSET_BITS 64 #define PAGE_SIZE 4096 typedef long long offset_t; typedef struct timebasestruct { unsigned int tb_high; /* high 32 bits, or seconds */ unsigned int tb_low; /* low 32 bits, or nanoseconds */ } timebasestruct_t; void GetTOD(timebasestruct_t *tP); #endif #ifdef GPFS_AIX /* Use 64 bit version of stat, etc. */ #define _LARGE_FILES #define _AIX_PTHREADS_D7 #define GetTOD(tP) read_real_time((tP),TIMEBASE_SZ) #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "irreg.h" #ifndef NO_AIO #ifdef GPFS_LINUX #define __USE_GNU #endif #ifdef GPFS_AIX #define _AIO_AIX_SOURCE #endif #include #endif #ifdef MULTI_NODE #include #endif #ifndef O_DIRECT #ifdef GPFS_LINUX #ifdef GPFS_ARCH_PPC64 #define O_DIRECT 0400000 #else #define O_DIRECT 040000 #endif #endif #ifdef GPFS_AIX #define O_DIRECT 0x08000000 #endif #endif #ifdef GPFS_LINUX #define CONST const #define RANDRET_T int32_t #define RANDOM_R(_r,_s) random_r(_s,_r); #define INITSTATE_R(_sd,_st,_sz,_rp,_rrs) initstate_r(_sd,_st,_sz,_rrs) #define SETSTATE_R(_rs,_rp,_rrs) setstate_r(_rs,_rrs) #define AIO_READ(_h,_p) aio_read(_p) #define AIO_WRITE(_h,_p) aio_write(_p) #define AIO_SUSPEND(_n,_p) aio_suspend(_p, _n, NULL) #define AIO_RETURN(_p) aio_return(&(_p)) #define AIO_ERROR(_p) aio_error(&(_p)) struct aioinit st_aioinit; #endif #ifdef GPFS_AIX #define CONST #define RANDRET_T long #define RANDOM_R(_r,_s) random_r(_r,_s); #define INITSTATE_R(_sd,_st,_sz,_rp,_rrs) initstate_r(_sd,_st,_sz,_rp,_rrs) #define SETSTATE_R(_rs,_rp,_rrs) setstate_r(_rs,_rp,_rrs) #define AIO_READ(_h,_p) aio_read(_h,_p) #define AIO_WRITE(_h,_p) aio_write(_h,_p) #define AIO_SUSPEND(_n,_p) aio_suspend(_n,_p) #define AIO_RETURN(_p) aio_return((_p).aio_handle) #define AIO_ERROR(_p) aio_error((_p).aio_handle) #endif /* Useful data types */ typedef unsigned int Boolean; #define true 1 #define false 0 typedef long long Int64; /* Process rank within job */ int ProcNum = -1; /* Total number of processes in job */ int NProcs = -1; /* Input parameters specified on command line */ enum { NoTest, CreateTest, WriteTest, ReadTest, UncacheTest } TestType = NoTest; enum { NoPattern, RandPattern, RandPatternWithHint, StridedPattern, SeqPattern } TestPattern = NoPattern; Boolean LabelledOutput = true; int RecordSize = -1; Int64 BytesToTransfer = -1; Int64 Stride = 0; int NThreadsPerProcess = 1; Boolean DoInvalidate = true; Boolean UseDataShipping = false; Boolean UseDirectIO = false; Boolean UseSharedMem = false; Boolean UseOsync = false; Boolean DoFsync = false; Boolean CycleData = true; Boolean TaskTimes = false; char* FileNameP = NULL; int Verbosity = 0; int DoFreeAll = false; Boolean UseAsyncIO = false; int AIODepth = 0; /* Maximum number of threads allowed */ #define MAX_THREADS 64 /* Values derived from input parameters */ Int64 RecordsInFile = -1; Int64 RecordsToTransfer = -1; /* Parameter passed to worker threads */ struct ThreadParm { /* For sequential and strided access, define the range of record numbers that will be accessed by this thread. The first record to be accessed is given by startRecordNum. If the range from startRecordNum to maxRecordNum is smaller than the number of records to be processed by this thread, the thread will cycle through the set of records from restartRecordNum to maxRecordNum as many times as is necessary to process the required number of records. Not used for random access patterns. */ Int64 startRecordNum; Int64 restartRecordNum; Int64 maxRecordNum; /* Number of records to be processed by this thread */ Int64 nRecords; /* Buffer address (possibly not page-aligned) */ char *origbufP; /* Page-aligned buffer of size RecordSize to be used by this thread */ char* bufP; /* Shared memory ID for buffer if using -shm option */ int shmid; /* Index of this entry */ int threadIndex; /* Number of seconds spent by this thread from before the first read or write until after the last read or write, not counting the file open or close. */ double xferInterval; } ParmArray[MAX_THREADS]; /* Maximum size of a batch of prefetch requests given to the PrefetchedIrregularXfer class at one time */ #define MAX_PREFETCHES 1024 /* Print usage message only on process 0, then exit */ void Usage() { if (ProcNum == 0) fprintf(stderr, "\nUsage: gpfsperf operation pattern fn [options]\n" " operations:\n" " read - read from the file fn\n" " write - write to the file fn\n" " create - create fn if it does not exist, then do a write test\n" " uncache - remove cached blocks of fn from buffer pool, then exit\n" " access patterns\n" " rand - access random records\n" " randhint - access random records, but use GPFS multiple access range\n" " hint\n" " strided - access records according to a non-overlapped strided\n" " pattern\n" " seq - access records sequentially\n" " fn - name of file to be accessed\n" " options can be:\n" " -nolabels - produce a single line of output containing all parameters\n" " and the measured data rate. Format of the output line is\n" " 'op pattern fn recordSize nBytes fileSize nProcs nThreads \n" " strideRecs inv ds dio shm fsync reltoken aio osync rate util'\n" " (default is to produce multi-line labelled output)\n" " -r recsize - record size (defaults to filesystem block size)\n" " -n nBytes - number of bytes to transfer (defaults to file size)\n" " -s stride - number of bytes between successive accesses by the\n" " same thread. Only meaningful for strided access patterns.\n" " Must be a multiple of the record size.\n" " Default is number of threads times number of processes.\n" " -th nThreads - number of threads per process (defaults to 1)\n" " -noinv - do not clear blocks of fn from the GPFS file cache before\n" " starting the test (default is to clear the cache)\n" " -ds - use GPFS data shipping (default is not to use data\n" " shipping)\n" " -aio depth - use Asynch I/O, prefetching to depth (default 0, max 1000)\n" " -dio - use direct I/O (default is not to use direct I/O)\n" " -shm - use shared memory buffer, with pinned large pages if\n" " possible (default is not to use shared memory buffer)\n" " -reltoken - release byte range token after open\n" " -fsync - fsync the file at the conclusion of a write or create test\n" " to insure that no dirty data remain buffered (default is\n" " not to fsync the file)\n" " -nocycle - for the seq access pattern when -n is larger than\n" " filesize / number-of-threads, read the entire file on each\n" " node rather than repeatedly reading the same segment of\n" " the file\n" " -tTimes - print detailed task times\n" " -v - verbose tracing\n" " -V - very verbose tracing\n" "\n" " Numbers can be given using K, M, G, or T suffixes, in either case, to\n" " denote 2**10, 2**20, 2**30, or 2**40, respectively, or can have an R\n" " suffix to denote a multiple of the record size.\n" "\n" " Reported data rate is in units of 1000 bytes/second.\n"); exit(1); } /* Convert a string to a number. The string may contain yy* prefixes and suffixes K, M, G, or T in either case. The suffix r or R is interpreted as the record size as input by the -r parameter. It is an error to use an R suffix if the -r parameter has not already appeared. */ long long GetNum(const char* p) { Int64 sign = +1; Int64 num = 0; const char* saveP = p; if (*p == '-') { sign = -1; p += 1; } while (isdigit(*p)) { num = 10*num + (*p)-'0'; p += 1; } num *= sign; switch (*p) { case '\0': return num; case '*': return num * GetNum(p+1); case '-': return num - GetNum(p+1); case '+': return num + GetNum(p+1); case 'k': case 'K': return 1024LL * num; case 'm': case 'M': return 1024LL * 1024LL * num; case 'g': case 'G': return 1024LL * 1024LL * 1024LL * num; case 't': case 'T': return 1024LL * 1024LL * 1024LL * 1024LL * num; case 'r': case 'R': if (RecordSize == -1) { fprintf(stderr, "Cannot use suffix R before -r parameter\n"); exit(1); } return (Int64)RecordSize * num; default: fprintf(stderr, "Invalid number %s\n", saveP); exit(1); } return 0; /* eliminate compiler warning */ } /* Convert a long integer to a string with a size suffix. For example, 2048 displays as 2K. */ char* int64ToString(Int64 in, char* bufP) { Int64 K = 1024LL; Int64 M = 1024LL*1024LL; Int64 G = 1024LL*1024LL*1024LL; if ((in % G) == 0) sprintf(bufP, "%lldG", in/G); else if ((in % M) == 0) sprintf(bufP, "%lldM", in/M); else if ((in % K) == 0) sprintf(bufP, "%lldK", in/K); else sprintf(bufP, "%lld", in); return bufP; } /* Verify that a return code is 0. If it is not, exit the program */ void CheckRC(int rc, const char* reasonP) { if (rc == 0) return; fprintf(stderr, "RC %d from %s, exitting\n", rc, reasonP); exit(1); } /* Compute the difference between two timestamps and return it as a floating point number of seconds */ double deltaTime(timebasestruct_t* startP, timebasestruct_t* stopP) { double dStartTime, dStopTime; #ifdef GPFS_AIX time_base_to_time(startP, TIMEBASE_SZ); time_base_to_time(stopP, TIMEBASE_SZ); #endif dStartTime = startP->tb_high + .000000001*startP->tb_low; dStopTime = stopP->tb_high + .000000001*stopP->tb_low; return dStopTime-dStartTime; } /* Do a series of random accesses to a file that has already been opened */ void DoRandTest(int handle, struct ThreadParm * parmP, const char* labelP) { timebasestruct_t time; uint seed; struct random_data rState; char randState[256]; char* randP; Int64 nRecordsLeft; RANDRET_T randRet; Int64 nextRecordNum; offset_t desiredOffset; offset_t actualOffset; offset_t len; /* Set the seed of the random number generator so each thread accesses a different sequence of records */ GetTOD(&time); seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid(); memset(&rState, 0, sizeof(rState)); INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState); SETSTATE_R(randState, &randP, &rState); /* Read or write random records */ nRecordsLeft = parmP->nRecords; while (nRecordsLeft > 0) { /* Generate the next record number */ RANDOM_R(&randRet, &rState); nextRecordNum = randRet % RecordsInFile; /* Seek to the next record */ desiredOffset = (offset_t) (nextRecordNum * RecordSize); actualOffset = lseek64(handle, desiredOffset, SEEK_SET); if (actualOffset != desiredOffset) { fprintf(stderr, "Error lseek64'ing to %lld in %s\n", desiredOffset, FileNameP); exit(1); } /* Read or write next record */ if (Verbosity >= 2) printf(" th %d: %s record %lld\n", parmP->threadIndex, labelP, nextRecordNum); if (TestType == ReadTest) len = read(handle, parmP->bufP, RecordSize); else len = write(handle, parmP->bufP, RecordSize); if (len != RecordSize) { fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len); exit(1); } /* Decrement number of records remaining and loop */ nRecordsLeft -= 1; } } /* Do a series of random accesses to a file that has already been opened. Before each group of accesses, issue a GPFS multiple access range hint so GPFS can prefetch blocks before they are needed. */ void DoRandTestWithHint(int handle, struct ThreadParm * parmP, const char* labelP) { struct pixAccDesc * accP; struct PrefetchedIrregularXfer x; timebasestruct_t time; uint seed; struct random_data rState; char randState[256]; char* randP; Int64 nRecordsLeft; int nRecordsInChunk; int i; RANDRET_T randRet; Int64 nextRecordNum; int rc; int nBytes; /* Allocate space to describe a batch of random accesses */ accP = (struct pixAccDesc *)malloc(MAX_PREFETCHES*sizeof(struct pixAccDesc)); if (accP == NULL) { fprintf(stderr, "Cannot allocate access descriptors\n"); exit(1); } /* Initialize the PrefetchedIrregularXfer state */ pixInit(&x); /* Turn on tracing in the PrefetchedIrregularXfer class according to the -v flag */ if (Verbosity >= 2) pixSetTraceLevel(&x, 1); /* Set the seed of the random number generator so each thread accesses a different sequence of records */ GetTOD(&time); seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid(); memset(&rState, 0, sizeof(rState)); INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState); SETSTATE_R(randState, &randP, &rState); /* Read or write random records */ nRecordsLeft = parmP->nRecords; while (nRecordsLeft > 0) { /* Generate the next batch of record numbers */ if (nRecordsLeft >= MAX_PREFETCHES) nRecordsInChunk = MAX_PREFETCHES; else nRecordsInChunk = nRecordsLeft; for (i=0; ibufP, &nBytes); if (rc != 0) { fprintf(stderr, "Error %d from xfer %d\n", rc, i); exit(1); } if (nBytes != RecordSize) { fprintf(stderr, "xfer %d moved %d bytes instead of %d\n", i, nBytes, RecordSize); exit(1); } } /* Decrement number of records remaining and loop */ nRecordsLeft -= nRecordsInChunk; } /* Clean up PrefetchedIrregularXfer object, free any prefetched records, and free storage for access descriptions */ pixTerm(&x); free(accP); } /* Set a MAR for a file on this node. */ void FreeAllRanges(int handle) { struct { gpfsFcntlHeader_t hdr; gpfsFreeRange_t fr; } freeHint; int rc; /* Issue the free range hint */ freeHint.hdr.totalLength = sizeof(freeHint); freeHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION; freeHint.hdr.fcntlReserved = 0; freeHint.fr.structLen = sizeof(gpfsFreeRange_t); freeHint.fr.structType = GPFS_FREE_RANGE; freeHint.fr.start = 0; freeHint.fr.length = 0; rc = gpfs_fcntl(handle, &freeHint); if (rc != 0) { fprintf(stderr, "gpfs_fcntl free range hint failed for file '%s'. " "errno=%d errorOffset=%d\n", FileNameP, errno, freeHint.hdr.errorOffset); exit(1); } } /* Set a MAR for a file on this node. */ void MARange(int handle, offset_t blockNumber, offset_t startoff, offset_t blkLen, Boolean isWrite) { struct { gpfsFcntlHeader_t hdr; gpfsMultipleAccessRange_t marh; } accHint; int rc; /* Issue the access range hint */ accHint.hdr.totalLength = sizeof(accHint); accHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION; accHint.hdr.fcntlReserved = 0; accHint.marh.structLen = sizeof(accHint.marh); accHint.marh.structType = GPFS_MULTIPLE_ACCESS_RANGE; accHint.marh.accRangeCnt = 1; accHint.marh.relRangeCnt = 0; accHint.marh.accRangeArray[0].blockNumber = blockNumber; accHint.marh.accRangeArray[0].start = startoff; accHint.marh.accRangeArray[0].length = blkLen; accHint.marh.accRangeArray[0].isWrite = isWrite; rc = gpfs_fcntl(handle, &accHint); if (rc != 0) { fprintf(stderr, "gpfs_fcntl access range hint failed for file '%s'. " "errno=%d errorOffset=%d\n", FileNameP, errno, accHint.hdr.errorOffset); exit(1); } } /* Do a series of strided accesses to a file that has already been opened */ void DoStridedTest(int handle, struct ThreadParm * parmP, const char* labelP) { Int64 strideRecords; Int64 nextRecordNum; Int64 maxRecordNum; Int64 nRecordsLeft; offset_t desiredOffset; offset_t actualOffset; offset_t len; /* Read or write records according to the strided pattern. If the end of the file is reached, seek back to the start record. */ if (Stride != 0) strideRecords = Stride / RecordSize; else strideRecords = NThreadsPerProcess * NProcs; nextRecordNum = parmP->startRecordNum; maxRecordNum = parmP->maxRecordNum; nRecordsLeft = parmP->nRecords; while (nRecordsLeft > 0) { /* Seek to the next record */ desiredOffset = (offset_t) (nextRecordNum * RecordSize); actualOffset = lseek64(handle, desiredOffset, SEEK_SET); if (actualOffset != desiredOffset) { fprintf(stderr, "Error lseek64'ing to %lld in %s\n", desiredOffset, FileNameP); exit(1); } /* Read or write next record */ if (Verbosity >= 2) printf(" th %d: %s record %lld\n", parmP->threadIndex, labelP, nextRecordNum); if (TestType == ReadTest) len = read(handle, parmP->bufP, RecordSize); else len = write(handle, parmP->bufP, RecordSize); if (len != RecordSize) { fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len); exit(1); } /* If next record would be outside of the area to be processed by this thread, go back to the beginning of the area */ nextRecordNum += strideRecords; if (nextRecordNum > maxRecordNum) nextRecordNum = parmP->restartRecordNum; nRecordsLeft -= 1; } } /* Do a series of sequential accesses to a file that has already been opened */ void DoSeqTest(int handle, struct ThreadParm * parmP, const char* labelP) { Int64 nextRecordNum; offset_t desiredOffset; offset_t actualOffset; Int64 maxRecordNum; Int64 nRecordsLeft; offset_t len; /* Seek to the beginning of the area that this thread should access */ nextRecordNum = parmP->startRecordNum; desiredOffset = (offset_t) (nextRecordNum * RecordSize); actualOffset = lseek64(handle, desiredOffset, SEEK_SET); if (actualOffset != desiredOffset) { fprintf(stderr, "Error lseek64'ing to %lld in %s\n", desiredOffset, FileNameP); exit(1); } /* Read or write records sequentially. If the end of the area to be accessed by this thread is reached, seek back to the beginning of the area. */ maxRecordNum = parmP->maxRecordNum; nRecordsLeft = parmP->nRecords; while (nRecordsLeft > 0) { /* Read or write next record */ if (Verbosity >= 2) printf(" th %d: %s record %lld\n", parmP->threadIndex, labelP, nextRecordNum); if (TestType == ReadTest) len = read(handle, parmP->bufP, RecordSize); else len = write(handle, parmP->bufP, RecordSize); if (len != RecordSize) { fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len); exit(1); } /* If next record would be outside of the area to be processed by this thread, seek back to the beginning of the area */ nextRecordNum += 1; if (nextRecordNum > maxRecordNum) { nextRecordNum = parmP->restartRecordNum; desiredOffset = (offset_t) (nextRecordNum * RecordSize); actualOffset = lseek64(handle, desiredOffset, SEEK_SET); if (actualOffset != desiredOffset) { fprintf(stderr, "Error lseek64'ing to %lld in %s\n", desiredOffset, FileNameP); exit(1); } } nRecordsLeft -= 1; } } /* Do a series of strided accesses to a file that has already been opened, using AIO interface */ void DoStridedAIOTest(int handle, struct ThreadParm * parmP, const char* labelP) { #ifndef NO_AIO Int64 strideRecords; Int64 nextRecordNum; Int64 maxRecordNum; Int64 nRecordsLeft; int len; int i, rc; char *curbufP; int OpIndex, PrefetchIndex; struct aiocb *accP; /* Allocate space to describe a batch of accesses */ accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb)); if (accP == NULL) { fprintf(stderr, "Cannot allocate AIO descriptors\n"); exit(1); } /* Set pointers to the buffers to be read into */ curbufP = parmP->bufP; for (i = 0; i < AIODepth; i++) { accP[i].aio_buf = curbufP; accP[i].aio_nbytes = RecordSize; /*accP[i].aio_offset = 0; set later */ curbufP += RecordSize; #ifdef GPFS_AIX accP[i].aio_whence = SEEK_SET; accP[i].aio_flag = 0; /* not AIO_SIGNAL */ accP[i].aio_handle = 0; #else accP[i].aio_fildes = handle; accP[i].aio_lio_opcode = 0; accP[i].aio_reqprio = 0; accP[i].aio_sigevent.sigev_notify = SIGEV_NONE; #endif } /* Read or write records according to the strided pattern. If the end of the file is reached, seek back to the start record. */ if (Stride != 0) strideRecords = Stride / RecordSize; else strideRecords = NThreadsPerProcess * NProcs; nextRecordNum = parmP->startRecordNum; maxRecordNum = parmP->maxRecordNum; nRecordsLeft = parmP->nRecords; OpIndex = 0; PrefetchIndex = 0; while (nRecordsLeft > 0) { if (PrefetchIndex >= AIODepth) { CONST struct aiocb *accPP[] = { &accP[OpIndex] }; /* All prefetches done, wait for next buffer before starting next prefetch */ do { rc = AIO_SUSPEND(1, accPP); if (rc < 0 && errno != EINTR) { fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex); exit(1); } } while (rc != 0); len = AIO_RETURN(accP[OpIndex]); if (len < 0) { fprintf(stderr, "Error %d from AIO %d\n", AIO_ERROR(accP[OpIndex]), OpIndex); exit(1); } if (len != RecordSize) { fprintf(stderr, "AIO %d moved %d bytes instead of %d\n", OpIndex, len, RecordSize); exit(1); } PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */ } /* Read or write next record */ accP[OpIndex].aio_offset = (offset_t)nextRecordNum * (offset_t)RecordSize; if (Verbosity >= 2) printf(" th %d: %s record %lld\n", parmP->threadIndex, labelP, nextRecordNum); /* Start the next read or write */ do { if (TestType == ReadTest) rc = AIO_READ(handle, &accP[OpIndex]); else rc = AIO_WRITE(handle, &accP[OpIndex]); if (rc < 0) { if (errno != EAGAIN) { fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex); exit(1); } } } while (rc != 0); /* If next record would be outside of the area to be processed by this thread, go back to the beginning of the area */ nextRecordNum += strideRecords; if (nextRecordNum > maxRecordNum) nextRecordNum = parmP->restartRecordNum; nRecordsLeft -= 1; OpIndex++; if (OpIndex >= AIODepth) OpIndex = 0; PrefetchIndex++; } #endif /* ! NO_AIO */ } /* Do a series of sequential accesses to a file that has already been opened, using AIO interface */ void DoSeqAIOTest(int handle, struct ThreadParm * parmP, const char* labelP) { #ifndef NO_AIO Int64 nextRecordNum; Int64 maxRecordNum; Int64 nRecordsLeft; int len; int i, rc; char *curbufP; int OpIndex, PrefetchIndex; struct aiocb *accP; /* Allocate space to describe a batch of accesses */ accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb)); if (accP == NULL) { fprintf(stderr, "Cannot allocate AIO descriptors\n"); exit(1); } /* Set pointers to the buffers to be read into */ curbufP = parmP->bufP; for (i = 0; i < AIODepth; i++) { accP[i].aio_buf = curbufP; accP[i].aio_nbytes = RecordSize; /*accP[i].aio_offset = 0; set later */ curbufP += RecordSize; #ifdef GPFS_AIX accP[i].aio_whence = SEEK_SET; accP[i].aio_flag = 0; /* not AIO_SIGNAL */ accP[i].aio_handle = 0; #else accP[i].aio_fildes = handle; accP[i].aio_lio_opcode = 0; accP[i].aio_reqprio = 0; accP[i].aio_sigevent.sigev_notify = SIGEV_NONE; #endif } /* Seek to the beginning of the area that this thread should access */ nextRecordNum = parmP->startRecordNum; /* Read or write records sequentially. If the end of the area to be accessed by this thread is reached, seek back to the beginning of the area. */ maxRecordNum = parmP->maxRecordNum; nRecordsLeft = parmP->nRecords; OpIndex = 0; PrefetchIndex = 0; while (nRecordsLeft > 0) { if (PrefetchIndex >= AIODepth) { CONST struct aiocb *accPP[] = { &accP[OpIndex] }; /* All prefetches done, wait for next buffer before starting next prefetch */ do { rc = AIO_SUSPEND(1, accPP); if (rc < 0 && errno != EINTR) { fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex); exit(1); } } while (rc != 0); len = AIO_RETURN(accP[OpIndex]); if (len < 0) { fprintf(stderr, "Error %d from AIO %d\n", AIO_ERROR(accP[OpIndex]), OpIndex); exit(1); } if (len != RecordSize) { fprintf(stderr, "AIO %d moved %d bytes instead of %d\n", OpIndex, len, RecordSize); exit(1); } PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */ } /* Read or write next record */ accP[OpIndex].aio_offset = (offset_t)nextRecordNum * (offset_t)RecordSize; if (Verbosity >= 2) printf(" th %d: %s record %lld\n", parmP->threadIndex, labelP, nextRecordNum); /* Start the next read or write */ do { if (TestType == ReadTest) rc = AIO_READ(handle, &accP[OpIndex]); else rc = AIO_WRITE(handle, &accP[OpIndex]); if (rc < 0) { if (errno != EAGAIN) { fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex); exit(1); } } } while (rc != 0); /* If next record would be outside of the area to be processed by this thread, seek back to the beginning of the area */ nextRecordNum += 1; if (nextRecordNum > maxRecordNum) nextRecordNum = parmP->restartRecordNum; nRecordsLeft -= 1; OpIndex++; if (OpIndex >= AIODepth) OpIndex = 0; PrefetchIndex++; } #endif /* ! NO_AIO */ } /* Do a series of random accesses to a file that has already been opened. Issue a AIO calls to prefetch blocks before they are needed. */ void DoRandAIOTest(int handle, struct ThreadParm * parmP, const char* labelP) { #ifndef NO_AIO timebasestruct_t time; uint seed; struct random_data rState; char randState[256]; char* randP; Int64 nRecordsLeft; RANDRET_T randRet; Int64 nextRecordNum; int i, rc; int len; char *curbufP; int OpIndex, PrefetchIndex; struct aiocb *accP; /* Allocate space to describe a batch of random accesses */ accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb)); if (accP == NULL) { fprintf(stderr, "Cannot allocate AIO descriptors\n"); exit(1); } /* Set pointers to the buffers to be read into */ curbufP = parmP->bufP; for (i = 0; i < AIODepth; i++) { accP[i].aio_buf = curbufP; accP[i].aio_nbytes = RecordSize; /*accP[i].aio_offset = 0; set later */ curbufP += RecordSize; #ifdef GPFS_AIX accP[i].aio_whence = SEEK_SET; accP[i].aio_flag = 0; /* not AIO_SIGNAL */ accP[i].aio_handle = 0; #else accP[i].aio_fildes = handle; accP[i].aio_lio_opcode = 0; accP[i].aio_reqprio = 0; accP[i].aio_sigevent.sigev_notify = SIGEV_NONE; #endif } /* Set the seed of the random number generator so each thread accesses a different sequence of records */ GetTOD(&time); seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid(); memset(&rState, 0, sizeof(rState)); INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState); SETSTATE_R(randState, &randP, &rState); /* Read or write random records */ OpIndex = 0; PrefetchIndex = 0; nRecordsLeft = parmP->nRecords; while (nRecordsLeft > 0) { if (PrefetchIndex >= AIODepth) { CONST struct aiocb *accPP[] = { &accP[OpIndex] }; /* All prefetches done, wait for next buffer before starting next prefetch */ do { rc = AIO_SUSPEND(1, accPP); if (rc < 0 && errno != EINTR) { fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex); exit(1); } } while (rc != 0); len = AIO_RETURN(accP[OpIndex]); if (len < 0) { fprintf(stderr, "Error %d from AIO %d\n", AIO_ERROR(accP[OpIndex]), OpIndex); exit(1); } if (len != RecordSize) { fprintf(stderr, "AIO %d moved %d bytes instead of %d\n", OpIndex, len, RecordSize); exit(1); } PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */ } /* Generate the next record number */ RANDOM_R(&randRet, &rState); nextRecordNum = randRet % RecordsInFile; accP[OpIndex].aio_offset = (Int64)nextRecordNum * (Int64)RecordSize; /* Start the next read or write */ do { if (TestType == ReadTest) rc = AIO_READ(handle, &accP[OpIndex]); else rc = AIO_WRITE(handle, &accP[OpIndex]); if (rc < 0) { if (errno != EAGAIN) { fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex); exit(1); } } } while (rc != 0); /* Decrement number of records remaining and loop */ nRecordsLeft--; OpIndex++; if (OpIndex >= AIODepth) OpIndex = 0; PrefetchIndex++; } /* Clean up aiocbs */ free(accP); #endif /* ! NO_AIO */ } /* Start up data shipping. The second parameter is the total number of open instances on all nodes that will be operating on the file. Must be called for every such instance with the same value of nInsts. */ void StartDataShipping(int handle, int nInsts) { struct { gpfsFcntlHeader_t hdr; gpfsDataShipStart_t start; } dsStart; int rc; dsStart.hdr.totalLength = sizeof(dsStart); dsStart.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION; dsStart.hdr.fcntlReserved = 0; dsStart.start.structLen = sizeof(gpfsDataShipStart_t); dsStart.start.structType = GPFS_DATA_SHIP_START; dsStart.start.numInstances = nInsts; dsStart.start.reserved = 0; rc = gpfs_fcntl(handle, &dsStart); if (rc != 0) { fprintf(stderr, "gpfs_fcntl DS start directive failed. " "errno=%d errorOffset=%d\n", errno, dsStart.hdr.errorOffset); exit(1); } } /* Shut down data shipping. Must be called for every handle for which StartDataShipping was called. */ void StopDataShipping(int handle) { struct { gpfsFcntlHeader_t hdr; gpfsDataShipStop_t stop; } dsStop; int rc; dsStop.hdr.totalLength = sizeof(dsStop); dsStop.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION; dsStop.hdr.fcntlReserved = 0; dsStop.stop.structLen = sizeof(dsStop.stop); dsStop.stop.structType = GPFS_DATA_SHIP_STOP; rc = gpfs_fcntl(handle, &dsStop); if (rc != 0) printf("gpfs_fcntl DS stop directive failed. errno=%d errorOffset=%d\n", errno, dsStop.hdr.errorOffset); } /* Body of one worker thread. Parameter is the local thread index. */ void* TestThreadBody(void* parm) { long th = (long)parm; struct ThreadParm * parmP= &ParmArray[th]; int openFlags; timebasestruct_t afterOpenTime, beforeCloseTime; int handle; char* labelP; int rc; /* Dump thread info if verbose tracing enabled */ if (Verbosity >= 1) printf("%d: th %d starting startRecordNum %lld restartRecordNum %lld maxRecordNum %lld nRecords %lld\n", ProcNum, parmP->threadIndex, parmP->startRecordNum, parmP->restartRecordNum, parmP->maxRecordNum, parmP->nRecords); /* Open the file with the appropriate flags */ switch (TestType) { case CreateTest: if (ProcNum == 0) openFlags = O_CREAT | O_WRONLY; else openFlags = O_WRONLY; break; case WriteTest: openFlags = O_WRONLY; break; case ReadTest: openFlags = O_RDONLY; break; default: openFlags = 0; break; } if (UseDirectIO) openFlags |= O_DIRECT; if (UseOsync) openFlags |= O_SYNC; #ifdef MULTI_NODE if (TestType == CreateTest) { /* Create file on process 0 node first, then open on the rest */ if (ProcNum == 0) { handle = open(FileNameP, openFlags, 0666); if (DoFreeAll) FreeAllRanges(handle); } rc = MPI_Barrier(MPI_COMM_WORLD); CheckRC(rc, "start MPI_Barrier"); if (ProcNum != 0) handle = open(FileNameP, openFlags, 0666); } else handle = open(FileNameP, openFlags, 0666); #else handle = open(FileNameP, openFlags, 0666); #endif if (handle == -1) { fprintf(stderr, "Could not open file %s, errno=%d\n", FileNameP, errno); exit(1); } /* Set up data shipping if necessary */ if (UseDataShipping) StartDataShipping(handle, NThreadsPerProcess * NProcs); /* Capture start timestamp for transfers by this thread */ GetTOD(&afterOpenTime); /* Read or write the requested number of records according to the correct access pattern */ if (TestType == ReadTest) labelP = "read"; else labelP = "write"; switch (TestPattern) { case RandPattern: if (UseAsyncIO) DoRandAIOTest(handle, parmP, labelP); else DoRandTest(handle, parmP, labelP); break; case RandPatternWithHint: DoRandTestWithHint(handle, parmP, labelP); break; case StridedPattern: if (UseAsyncIO) DoStridedAIOTest(handle, parmP, labelP); else DoStridedTest(handle, parmP, labelP); break; case SeqPattern: if (UseAsyncIO) DoSeqAIOTest(handle, parmP, labelP); else DoSeqTest(handle, parmP, labelP); break; default: break; } /* Timestamp end of transfers and remember how long the thread ran */ GetTOD(&beforeCloseTime); parmP->xferInterval = deltaTime(&afterOpenTime, &beforeCloseTime); /* Shut down data shipping if it was in effect */ if (UseDataShipping) StopDataShipping(handle); /* Close the file and print progress indicator */ rc = close(handle); CheckRC(rc, "closing file"); if (Verbosity >= 1) printf("%d: th %d finished, did all transfers in %.2f seconds\n", ProcNum, parmP->threadIndex, parmP->xferInterval); pthread_exit(NULL); } /* Invalidate all cached data held on behalf of a file on this node. */ void InvalidateFileCache(char* invFileNameP) { int handle; struct { gpfsFcntlHeader_t hdr; gpfsClearFileCache_t inv; } invCacheHint; int rc; /* Open the file. If the open fails, the file cannot be cached. */ handle = open(invFileNameP, O_RDONLY, 0); if (handle == -1) return; /* Issue the invalidate hint */ invCacheHint.hdr.totalLength = sizeof(invCacheHint); invCacheHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION; invCacheHint.hdr.fcntlReserved = 0; invCacheHint.inv.structLen = sizeof(gpfsClearFileCache_t); invCacheHint.inv.structType = GPFS_CLEAR_FILE_CACHE; rc = gpfs_fcntl(handle, &invCacheHint); if (rc != 0) { fprintf(stderr, "gpfs_fcntl clear cache hint failed for file '%s'. " "errno=%d errorOffset=%d\n", invFileNameP, errno, invCacheHint.hdr.errorOffset); exit(1); } /* Close the file */ rc = close(handle); if (rc == -1) { fprintf(stderr, "Could not close file '%s' after flushing file cache, errno=%d\n", invFileNameP, errno); exit(1); } } /* This routine is called at exit to clean up any shared memory segments that might have been created if the UseSharedMem option was selected. */ static void cleanupSegs() { int th; for (th=0; th= 0) { shmdt(ParmArray[th].origbufP); shmctl(ParmArray[th].shmid, IPC_RMID, NULL); } } /* Main routine */ int main(int argc, char *argvPP[]) { int i, j, rc, len; struct stat statBuf; int handle; long th; Int64 baseRecordsPerThreadToXfer; Int64 threadsWithExtraRecordToXfer; Int64 baseRecordsPerThreadInFile; Int64 threadsWithExtraRecordInFile; int gblThreadIndex; char* p; char buf1[32], buf2[32], buf3[32]; pthread_t threadIds[MAX_THREADS]; pthread_attr_t threadAttr; timebasestruct_t startTime, endTime; double totalSeconds; double threadTimes[MAX_THREADS]; double* allXferTimesP = NULL; double totalXferTimes; double threadUtil; double dataRate; double* dP; /* Initialize MPI communication */ #ifdef MULTI_NODE MPI_Init(&argc, &argvPP); MPI_Comm_rank(MPI_COMM_WORLD, &ProcNum); MPI_Comm_size(MPI_COMM_WORLD, &NProcs); #else ProcNum = 0; NProcs = 1; #endif /* Parse arguments */ for (i=1; i 1) { fprintf(stderr, "Multiple threads per process cannot be used with -aio\n"); exit(1); } if (AIODepth <= 0 || AIODepth > 1000) { fprintf(stderr, "-aio depth must be greater than 0 and less than 1000\n"); exit(1); } } /* If the file already exists, issue the GPFS hint that flushes and invalidates all cached data belonging to the file so that each test begins in the same state */ if (rc == 0 && (DoInvalidate || TestType == UncacheTest)) { InvalidateFileCache(FileNameP); if (TestType == UncacheTest) { /* Nothing else to do. Wait until all processes have reached this point, then exit */ #ifdef MULTI_NODE rc = MPI_Barrier(MPI_COMM_WORLD); CheckRC(rc, "finish MPI_Barrier"); #endif if (ProcNum == 0) printf("uncache %s\n", FileNameP); #ifdef MULTI_NODE MPI_Finalize(); #endif return 0; } } /* Continue validating arguments after cache invalidation */ if (TestType == NoTest) { if (ProcNum == 0) fprintf(stderr, "No test type given\n"); Usage(); } if (TestPattern == NoPattern) { if (ProcNum == 0) fprintf(stderr, "No access pattern given\n"); Usage(); } if (Stride != 0 && TestPattern != StridedPattern) { if (ProcNum == 0) fprintf(stderr, "Cannot specify -s unless access pattern is strided\n"); Usage(); } if (NThreadsPerProcess <= 0 || NThreadsPerProcess > MAX_THREADS) { if (ProcNum == 0) fprintf(stderr, "Invalid number of threads\n"); Usage(); } /* If record size or number of bytes to transfer are not known, use the information from the stat */ if (RecordSize == -1) RecordSize = statBuf.st_blksize; if (BytesToTransfer == -1) BytesToTransfer = (Int64)statBuf.st_size; /* Make sure that -r and -n are known */ if (RecordSize == -1) { if (ProcNum == 0) fprintf(stderr, "Record size not known; use -r option\n"); exit(1); } if (BytesToTransfer == -1) { if (ProcNum == 0) fprintf(stderr, "Bytes to transfer not known; use -n option\n"); exit(1); } /* Make sure -s is legal */ if (Stride != 0 && (Stride%RecordSize) != 0) { if (ProcNum == 0) fprintf(stderr, "Stride must be a multiple of record size\n"); exit(1); } /* Compute number of records to read and write as well as number of records in the file */ RecordsInFile = (Int64)statBuf.st_size / RecordSize; RecordsToTransfer = BytesToTransfer / RecordSize; BytesToTransfer = RecordsToTransfer * RecordSize; /* Make sure the file is big enough to allow non-overlapping accesses */ if (NThreadsPerProcess*NProcs > RecordsInFile && (TestPattern==StridedPattern || TestPattern==SeqPattern)) { if (ProcNum == 0) fprintf(stderr, "File too small to allow non-overlapping accesses\n"); exit(1); } /* If using shared memory, register a routine to clean up segments on exit */ if (UseSharedMem) { for (th=0; th= 1) printf("%d: totalSeconds %f\n", ProcNum, totalSeconds); /* Thread utilization or efficiency: we would like to know if all threads took the same amount of time or if some threads finished significantly before others. For example, suppose the test involved 4 threads and ran for a total of 10 seconds. If all four threads required the full 10 seconds to do all of their reads or writes, the utilization would be (10+10+10+10)/(4*10) = 1.000. On the other hand, if 3 of the threads took 4 seconds each and the last one took 9 seconds, the utilization would be (4+4+4+9)/(4*10) = 0.525. Gather the thread run time information from all processes to process 0. */ for (th=0; thtb_high = time.tv_sec; tP->tb_low = time.tv_usec * 1000; } #endif