source: gpfs_3.1_ker2.6.20/lpp/mmfs/samples/perf/gpfsperf.c @ 211

Last change on this file since 211 was 16, checked in by rock, 17 years ago
File size: 57.4 KB
Line 
1/* IBM_PROLOG_BEGIN_TAG                                                   */
2/* This is an automatically generated prolog.                             */
3/*                                                                        */
4/*                                                                        */
5/*                                                                        */
6/* Licensed Materials - Property of IBM                                   */
7/*                                                                        */
8/* (C) COPYRIGHT International Business Machines Corp. 1999,2006          */
9/* All Rights Reserved                                                    */
10/*                                                                        */
11/* US Government Users Restricted Rights - Use, duplication or            */
12/* disclosure restricted by GSA ADP Schedule Contract with IBM Corp.      */
13/*                                                                        */
14/* IBM_PROLOG_END_TAG                                                     */
15
16/* Multinode GPFS performance test program.  Measures aggregate data
17   transfer rate across several nodes for random, strided, or sequential
18   access patterns. */
19
20#ifdef GPFS_LINUX
21/* Use 64 bit version of stat, etc. */
22#define _LARGEFILE_SOURCE
23#define _LARGEFILE64_SOURCE
24#define _FILE_OFFSET_BITS 64
25
26#define PAGE_SIZE 4096
27typedef long long offset_t;
28typedef struct timebasestruct
29{
30  unsigned int tb_high; /* high 32 bits, or seconds */
31  unsigned int tb_low;  /* low 32 bits, or nanoseconds */
32} timebasestruct_t;
33
34void GetTOD(timebasestruct_t *tP);
35#endif
36
37#ifdef GPFS_AIX
38/* Use 64 bit version of stat, etc. */
39#define _LARGE_FILES
40#define _AIX_PTHREADS_D7
41
42#define GetTOD(tP) read_real_time((tP),TIMEBASE_SZ)
43#endif
44
45#include <pthread.h>
46#include <stdio.h>
47#include <fcntl.h>
48#include <errno.h>
49#include <sys/time.h>
50#include <sys/stat.h>
51#include <sys/ipc.h>
52#include <sys/shm.h>
53#include <assert.h>
54#include <time.h>
55#include <stdlib.h>
56#include <unistd.h>
57#include <ctype.h>
58#include <string.h>
59#include <gpfs_fcntl.h>
60#include "irreg.h"
61#ifndef NO_AIO
62#ifdef GPFS_LINUX
63#define __USE_GNU
64#endif
65#ifdef GPFS_AIX
66#define _AIO_AIX_SOURCE
67#endif
68#include <aio.h>
69#endif
70
71#ifdef MULTI_NODE
72#include <mpi.h>
73#endif
74
75#ifndef O_DIRECT
76#ifdef GPFS_LINUX
77#ifdef GPFS_ARCH_PPC64
78#define O_DIRECT 0400000
79#else
80#define O_DIRECT 040000
81#endif
82#endif
83#ifdef GPFS_AIX
84#define O_DIRECT 0x08000000
85#endif
86#endif
87
88#ifdef GPFS_LINUX
89#define CONST const
90#define RANDRET_T int32_t
91#define RANDOM_R(_r,_s) random_r(_s,_r);
92#define INITSTATE_R(_sd,_st,_sz,_rp,_rrs) initstate_r(_sd,_st,_sz,_rrs)
93#define SETSTATE_R(_rs,_rp,_rrs) setstate_r(_rs,_rrs)
94
95#define AIO_READ(_h,_p) aio_read(_p)
96#define AIO_WRITE(_h,_p) aio_write(_p)
97#define AIO_SUSPEND(_n,_p) aio_suspend(_p, _n, NULL)
98#define AIO_RETURN(_p) aio_return(&(_p))
99#define AIO_ERROR(_p) aio_error(&(_p))
100struct aioinit st_aioinit;
101#endif
102#ifdef GPFS_AIX
103#define CONST
104#define RANDRET_T long
105#define RANDOM_R(_r,_s) random_r(_r,_s);
106#define INITSTATE_R(_sd,_st,_sz,_rp,_rrs) initstate_r(_sd,_st,_sz,_rp,_rrs)
107#define SETSTATE_R(_rs,_rp,_rrs) setstate_r(_rs,_rp,_rrs)
108
109#define AIO_READ(_h,_p) aio_read(_h,_p)
110#define AIO_WRITE(_h,_p) aio_write(_h,_p)
111#define AIO_SUSPEND(_n,_p) aio_suspend(_n,_p)
112#define AIO_RETURN(_p) aio_return((_p).aio_handle)
113#define AIO_ERROR(_p) aio_error((_p).aio_handle)
114#endif
115
116/* Useful data types */
117typedef unsigned int Boolean;
118#define true 1
119#define false 0
120typedef long long Int64;
121
122/* Process rank within job */
123int ProcNum = -1;
124
125/* Total number of processes in job */
126int NProcs = -1;
127
128/* Input parameters specified on command line */
129enum { NoTest, CreateTest, WriteTest, ReadTest, UncacheTest } TestType = NoTest;
130enum
131{
132  NoPattern, RandPattern, RandPatternWithHint, StridedPattern, SeqPattern
133} TestPattern = NoPattern;
134Boolean LabelledOutput = true;
135int RecordSize = -1;
136Int64 BytesToTransfer = -1;
137Int64 Stride = 0;
138int NThreadsPerProcess = 1;
139Boolean DoInvalidate = true;
140Boolean UseDataShipping = false;
141Boolean UseDirectIO = false;
142Boolean UseSharedMem = false;
143Boolean UseOsync = false;
144Boolean DoFsync = false;
145Boolean CycleData = true;
146Boolean TaskTimes = false;
147char* FileNameP = NULL;
148int Verbosity = 0;
149int DoFreeAll = false;
150Boolean UseAsyncIO = false;
151int AIODepth = 0;
152
153/* Maximum number of threads allowed */
154#define MAX_THREADS 64
155
156/* Values derived from input parameters */
157Int64 RecordsInFile = -1;
158Int64 RecordsToTransfer = -1;
159
160/* Parameter passed to worker threads */
161struct ThreadParm
162{
163  /* For sequential and strided access, define the range of record
164     numbers that will be accessed by this thread.  The first record to
165     be accessed is given by startRecordNum.  If the range from
166     startRecordNum to maxRecordNum is smaller than the number of
167     records to be processed by this thread, the thread will cycle
168     through the set of records from restartRecordNum to maxRecordNum as
169     many times as is necessary to process the required number of
170     records.  Not used for random access patterns. */
171  Int64 startRecordNum;
172  Int64 restartRecordNum;
173  Int64 maxRecordNum;
174
175  /* Number of records to be processed by this thread */
176  Int64 nRecords;
177
178  /* Buffer address (possibly not page-aligned) */
179  char *origbufP;
180
181  /* Page-aligned buffer of size RecordSize to be used by this thread */
182  char* bufP;
183
184  /* Shared memory ID for buffer if using -shm option */
185  int shmid;
186
187  /* Index of this entry */
188  int threadIndex;
189
190  /* Number of seconds spent by this thread from before the first read or
191     write until after the last read or write, not counting the file open
192     or close. */
193  double xferInterval;
194} ParmArray[MAX_THREADS];
195
196/* Maximum size of a batch of prefetch requests given to the
197   PrefetchedIrregularXfer class at one time */
198#define MAX_PREFETCHES 1024
199
200
201/* Print usage message only on process 0, then exit */
202void Usage()
203{
204  if (ProcNum == 0)
205    fprintf(stderr,
206            "\nUsage: gpfsperf operation pattern fn [options]\n"
207              "  operations:\n"
208              "    read         - read from the file fn\n"
209              "    write        - write to the file fn\n"
210              "    create       - create fn if it does not exist, then do a write test\n"
211              "    uncache      - remove cached blocks of fn from buffer pool, then exit\n"
212              "  access patterns\n"
213              "    rand         - access random records\n"
214              "    randhint     - access random records, but use GPFS multiple access range\n"
215              "                   hint\n"
216              "    strided      - access records according to a non-overlapped strided\n"
217              "                   pattern\n"
218              "    seq          - access records sequentially\n"
219              "  fn             - name of file to be accessed\n"
220              "  options can be:\n"
221              "    -nolabels    - produce a single line of output containing all parameters\n"
222              "                   and the measured data rate.  Format of the output line is\n"
223              "                   'op pattern fn recordSize nBytes fileSize nProcs nThreads \n"
224              "                     strideRecs inv ds dio shm fsync reltoken aio osync rate util'\n"
225              "                   (default is to produce multi-line labelled output)\n"
226              "    -r recsize   - record size (defaults to filesystem block size)\n"
227              "    -n nBytes    - number of bytes to transfer (defaults to file size)\n"
228              "    -s stride    - number of bytes between successive accesses by the\n"
229              "                   same thread.  Only meaningful for strided access patterns.\n"
230              "                   Must be a multiple of the record size.\n"
231              "                   Default is number of threads times number of processes.\n"
232              "    -th nThreads - number of threads per process (defaults to 1)\n"
233              "    -noinv       - do not clear blocks of fn from the GPFS file cache before\n"
234              "                   starting the test (default is to clear the cache)\n"
235              "    -ds          - use GPFS data shipping (default is not to use data\n"
236              "                   shipping)\n"
237              "    -aio depth   - use Asynch I/O, prefetching to depth (default 0, max 1000)\n"
238              "    -dio         - use direct I/O (default is not to use direct I/O)\n"
239              "    -shm         - use shared memory buffer, with pinned large pages if\n"
240              "                   possible (default is not to use shared memory buffer)\n"
241              "    -reltoken    - release byte range token after open\n"
242              "    -fsync       - fsync the file at the conclusion of a write or create test\n"
243              "                   to insure that no dirty data remain buffered (default is\n"
244              "                   not to fsync the file)\n"
245              "    -nocycle     - for the seq access pattern when -n is larger than\n"
246              "                   filesize / number-of-threads, read the entire file on each\n"
247              "                   node rather than repeatedly reading the same segment of\n"
248              "                   the file\n"
249              "    -tTimes      - print detailed task times\n"
250              "    -v           - verbose tracing\n"
251              "    -V           - very verbose tracing\n"
252              "\n"
253              "  Numbers can be given using K, M, G, or T suffixes, in either case, to\n"
254              "  denote 2**10, 2**20, 2**30, or 2**40, respectively, or can have an R\n"
255              "  suffix to denote a multiple of the record size.\n"
256              "\n"
257              "  Reported data rate is in units of 1000 bytes/second.\n");
258  exit(1);
259}
260
261
262/* Convert a string to a number.  The string may contain yy* prefixes and
263   suffixes K, M, G, or T in either case.  The suffix r or R is interpreted
264   as the record size as input by the -r parameter.  It is an error to use
265   an R suffix if the -r parameter has not already appeared. */
266long long GetNum(const char* p)
267{
268  Int64 sign = +1;
269  Int64 num = 0;
270  const char* saveP = p;
271
272  if (*p == '-')
273  {
274    sign = -1;
275    p += 1;
276  }
277  while (isdigit(*p))
278  {
279    num = 10*num + (*p)-'0';
280    p += 1;
281  }
282  num *= sign;
283
284  switch (*p)
285  {
286    case '\0':
287      return num;
288
289    case '*':
290      return num * GetNum(p+1);
291
292    case '-':
293      return num - GetNum(p+1);
294
295    case '+':
296      return num + GetNum(p+1);
297
298    case 'k':
299    case 'K':
300      return 1024LL * num;
301
302    case 'm':
303    case 'M':
304      return 1024LL * 1024LL * num;
305
306    case 'g':
307    case 'G':
308      return 1024LL * 1024LL * 1024LL * num;
309
310    case 't':
311    case 'T':
312      return 1024LL * 1024LL * 1024LL * 1024LL * num;
313
314    case 'r':
315    case 'R':
316      if (RecordSize == -1)
317      {
318        fprintf(stderr, "Cannot use suffix R before -r parameter\n");
319        exit(1);
320      }
321      return (Int64)RecordSize * num;
322
323    default:
324      fprintf(stderr, "Invalid number %s\n", saveP);
325      exit(1);
326  }
327  return 0;  /* eliminate compiler warning */
328}
329
330
331/* Convert a long integer to a string with a size suffix.  For example,
332   2048 displays as 2K. */
333char* int64ToString(Int64 in, char* bufP)
334{
335  Int64 K = 1024LL;
336  Int64 M = 1024LL*1024LL;
337  Int64 G = 1024LL*1024LL*1024LL;
338
339  if ((in % G) == 0)
340    sprintf(bufP, "%lldG", in/G);
341  else if ((in % M) == 0)
342    sprintf(bufP, "%lldM", in/M);
343  else if ((in % K) == 0)
344    sprintf(bufP, "%lldK", in/K);
345  else
346    sprintf(bufP, "%lld", in);
347  return bufP;
348}
349
350
351/* Verify that a return code is 0.  If it is not, exit the program */
352void CheckRC(int rc, const char* reasonP)
353{
354  if (rc == 0)
355    return;
356  fprintf(stderr, "RC %d from %s, exitting\n", rc, reasonP);
357  exit(1);
358}
359
360
361/* Compute the difference between two timestamps and return it as a
362   floating point number of seconds */
363double deltaTime(timebasestruct_t* startP, timebasestruct_t* stopP)
364{
365  double dStartTime, dStopTime;
366
367#ifdef GPFS_AIX
368  time_base_to_time(startP, TIMEBASE_SZ);
369  time_base_to_time(stopP, TIMEBASE_SZ);
370#endif
371  dStartTime = startP->tb_high + .000000001*startP->tb_low;
372  dStopTime = stopP->tb_high + .000000001*stopP->tb_low;
373  return dStopTime-dStartTime;
374}
375
376
377/* Do a series of random accesses to a file that has already been opened */
378void DoRandTest(int handle, struct ThreadParm * parmP, const char* labelP)
379{
380  timebasestruct_t time;
381  uint seed;
382  struct random_data rState;
383  char randState[256];
384  char* randP;
385  Int64 nRecordsLeft;
386  RANDRET_T randRet;
387  Int64 nextRecordNum;
388  offset_t desiredOffset;
389  offset_t actualOffset;
390  offset_t len;
391
392  /* Set the seed of the random number generator so each thread accesses
393     a different sequence of records */
394  GetTOD(&time);
395  seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid();
396  memset(&rState, 0, sizeof(rState));
397
398  INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState);
399  SETSTATE_R(randState, &randP, &rState);
400
401  /* Read or write random records */
402  nRecordsLeft = parmP->nRecords;
403  while (nRecordsLeft > 0)
404  {
405    /* Generate the next record number */
406    RANDOM_R(&randRet, &rState);
407    nextRecordNum = randRet % RecordsInFile;
408
409    /* Seek to the next record */
410    desiredOffset = (offset_t) (nextRecordNum * RecordSize);
411    actualOffset = lseek64(handle, desiredOffset, SEEK_SET);
412    if (actualOffset != desiredOffset)
413    {
414      fprintf(stderr, "Error lseek64'ing to %lld in %s\n",
415              desiredOffset, FileNameP);
416      exit(1);
417    }
418
419    /* Read or write next record */
420    if (Verbosity >= 2)
421      printf(" th %d: %s record %lld\n",
422             parmP->threadIndex, labelP, nextRecordNum);
423    if (TestType == ReadTest)
424      len = read(handle, parmP->bufP, RecordSize);
425    else
426      len = write(handle, parmP->bufP, RecordSize);
427    if (len != RecordSize)
428    {
429      fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len);
430      exit(1);
431    }
432
433    /* Decrement number of records remaining and loop */
434    nRecordsLeft -= 1;
435  }
436
437}
438
439
440/* Do a series of random accesses to a file that has already been opened.
441   Before each group of accesses, issue a GPFS multiple access range hint
442   so GPFS can prefetch blocks before they are needed. */
443void DoRandTestWithHint(int handle, struct ThreadParm * parmP, const char* labelP)
444{
445  struct pixAccDesc * accP;
446  struct PrefetchedIrregularXfer x;
447  timebasestruct_t time;
448  uint seed;
449  struct random_data rState;
450  char randState[256];
451  char* randP;
452  Int64 nRecordsLeft;
453  int nRecordsInChunk;
454  int i;
455  RANDRET_T randRet;
456  Int64 nextRecordNum;
457  int rc;
458  int nBytes;
459
460  /* Allocate space to describe a batch of random accesses */
461  accP = (struct pixAccDesc *)malloc(MAX_PREFETCHES*sizeof(struct pixAccDesc));
462  if (accP == NULL)
463  {
464    fprintf(stderr, "Cannot allocate access descriptors\n");
465    exit(1);
466  }
467
468  /* Initialize the PrefetchedIrregularXfer state */
469  pixInit(&x);
470
471  /* Turn on tracing in the PrefetchedIrregularXfer class according to
472     the -v flag */
473  if (Verbosity >= 2)
474    pixSetTraceLevel(&x, 1);
475
476  /* Set the seed of the random number generator so each thread accesses
477     a different sequence of records */
478  GetTOD(&time);
479  seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid();
480  memset(&rState, 0, sizeof(rState));
481
482  INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState);
483  SETSTATE_R(randState, &randP, &rState);
484
485  /* Read or write random records */
486  nRecordsLeft = parmP->nRecords;
487  while (nRecordsLeft > 0)
488  {
489    /* Generate the next batch of record numbers */
490    if (nRecordsLeft >= MAX_PREFETCHES)
491      nRecordsInChunk = MAX_PREFETCHES;
492    else
493      nRecordsInChunk = nRecordsLeft;
494    for (i=0; i<nRecordsInChunk; i++)
495    {
496      RANDOM_R(&randRet, &rState);
497      nextRecordNum = randRet % RecordsInFile;
498      accP[i].off = (Int64)nextRecordNum * (Int64)RecordSize;
499      accP[i].len = RecordSize;
500    }
501
502    /* Give the list of future accesses to the PrefetchedIrregularXfer object
503       so it can begin issuing GPFS multiple access range hints to prefetch
504       the data that will be needed */
505    rc = pixDeclareAccesses(&x, handle, TestType!=ReadTest, nRecordsInChunk, accP);
506    if (rc != 0)
507    {
508      fprintf(stderr, "Error %d from declareAccesses\n", rc);
509      exit(1);
510    }
511
512    /* Perform all of the reads or writes in the current chunk */
513    for (i=0; i<nRecordsInChunk; i++)
514    {
515      rc = pixXfer(&x, parmP->bufP, &nBytes);
516      if (rc != 0)
517      {
518        fprintf(stderr, "Error %d from xfer %d\n", rc, i);
519        exit(1);
520      }
521      if (nBytes != RecordSize)
522      {
523        fprintf(stderr, "xfer %d moved %d bytes instead of %d\n",
524                i, nBytes, RecordSize);
525        exit(1);
526      }
527    }
528
529    /* Decrement number of records remaining and loop */
530    nRecordsLeft -= nRecordsInChunk;
531  }
532
533  /* Clean up PrefetchedIrregularXfer object, free any prefetched records,
534     and free storage for access descriptions */
535  pixTerm(&x);
536  free(accP);
537}
538
539
540/* Set a MAR for a file on this node. */
541void FreeAllRanges(int handle)
542{
543  struct
544  {
545    gpfsFcntlHeader_t hdr;
546    gpfsFreeRange_t fr;
547  } freeHint;
548  int rc;
549
550  /* Issue the free range hint */
551  freeHint.hdr.totalLength = sizeof(freeHint);
552  freeHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
553  freeHint.hdr.fcntlReserved = 0;
554  freeHint.fr.structLen = sizeof(gpfsFreeRange_t);
555  freeHint.fr.structType = GPFS_FREE_RANGE;
556  freeHint.fr.start = 0;
557  freeHint.fr.length = 0;
558  rc = gpfs_fcntl(handle, &freeHint);
559  if (rc != 0)
560  {
561    fprintf(stderr, "gpfs_fcntl free range hint failed for file '%s'. "
562              "errno=%d errorOffset=%d\n",
563            FileNameP, errno, freeHint.hdr.errorOffset);
564    exit(1);
565  }
566}
567
568
569/* Set a MAR for a file on this node. */
570void MARange(int handle, offset_t blockNumber, offset_t startoff,
571             offset_t blkLen, Boolean isWrite)
572{
573  struct
574  {
575    gpfsFcntlHeader_t hdr;
576    gpfsMultipleAccessRange_t marh;
577  } accHint;
578  int rc;
579
580  /* Issue the access range hint */
581  accHint.hdr.totalLength = sizeof(accHint);
582  accHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
583  accHint.hdr.fcntlReserved = 0;
584  accHint.marh.structLen = sizeof(accHint.marh);
585  accHint.marh.structType = GPFS_MULTIPLE_ACCESS_RANGE;
586  accHint.marh.accRangeCnt = 1;
587  accHint.marh.relRangeCnt = 0;
588  accHint.marh.accRangeArray[0].blockNumber = blockNumber;
589  accHint.marh.accRangeArray[0].start = startoff;
590  accHint.marh.accRangeArray[0].length = blkLen;
591  accHint.marh.accRangeArray[0].isWrite = isWrite;
592  rc = gpfs_fcntl(handle, &accHint);
593  if (rc != 0)
594  {
595    fprintf(stderr, "gpfs_fcntl access range hint failed for file '%s'. "
596              "errno=%d errorOffset=%d\n",
597            FileNameP, errno, accHint.hdr.errorOffset);
598    exit(1);
599  }
600}
601
602/* Do a series of strided accesses to a file that has already been
603   opened */
604void DoStridedTest(int handle, struct ThreadParm * parmP, const char* labelP)
605{
606  Int64 strideRecords;
607  Int64 nextRecordNum;
608  Int64 maxRecordNum;
609  Int64 nRecordsLeft;
610  offset_t desiredOffset;
611  offset_t actualOffset;
612  offset_t len;
613
614  /* Read or write records according to the strided pattern.  If the end of
615     the file is reached, seek back to the start record. */
616  if (Stride != 0)
617    strideRecords = Stride / RecordSize;
618  else
619    strideRecords = NThreadsPerProcess * NProcs;
620  nextRecordNum = parmP->startRecordNum;
621  maxRecordNum = parmP->maxRecordNum;
622  nRecordsLeft = parmP->nRecords;
623
624  while (nRecordsLeft > 0)
625  {
626    /* Seek to the next record */
627    desiredOffset = (offset_t) (nextRecordNum * RecordSize);
628    actualOffset = lseek64(handle, desiredOffset, SEEK_SET);
629    if (actualOffset != desiredOffset)
630    {
631      fprintf(stderr, "Error lseek64'ing to %lld in %s\n",
632              desiredOffset, FileNameP);
633      exit(1);
634    }
635
636    /* Read or write next record */
637    if (Verbosity >= 2)
638      printf(" th %d: %s record %lld\n",
639             parmP->threadIndex, labelP, nextRecordNum);
640    if (TestType == ReadTest)
641      len = read(handle, parmP->bufP, RecordSize);
642    else
643      len = write(handle, parmP->bufP, RecordSize);
644    if (len != RecordSize)
645    {
646      fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len);
647      exit(1);
648    }
649
650    /* If next record would be outside of the area to be processed by this
651       thread, go back to the beginning of the area */
652    nextRecordNum += strideRecords;
653    if (nextRecordNum > maxRecordNum)
654      nextRecordNum = parmP->restartRecordNum;
655    nRecordsLeft -= 1;
656  }
657}
658
659
660/* Do a series of sequential accesses to a file that has already been
661   opened */
662void DoSeqTest(int handle, struct ThreadParm * parmP, const char* labelP)
663{
664  Int64 nextRecordNum;
665  offset_t desiredOffset;
666  offset_t actualOffset;
667  Int64 maxRecordNum;
668  Int64 nRecordsLeft;
669  offset_t len;
670
671  /* Seek to the beginning of the area that this thread should access */
672  nextRecordNum = parmP->startRecordNum;
673  desiredOffset = (offset_t) (nextRecordNum * RecordSize);
674  actualOffset = lseek64(handle, desiredOffset, SEEK_SET);
675  if (actualOffset != desiredOffset)
676  {
677    fprintf(stderr, "Error lseek64'ing to %lld in %s\n", desiredOffset, FileNameP);
678    exit(1);
679  }
680
681  /* Read or write records sequentially.  If the end of the area to be
682     accessed by this thread is reached, seek back to the beginning of the
683     area. */
684  maxRecordNum = parmP->maxRecordNum;
685  nRecordsLeft = parmP->nRecords;
686  while (nRecordsLeft > 0)
687  {
688    /* Read or write next record */
689    if (Verbosity >= 2)
690      printf(" th %d: %s record %lld\n",
691             parmP->threadIndex, labelP, nextRecordNum);
692    if (TestType == ReadTest)
693      len = read(handle, parmP->bufP, RecordSize);
694    else
695      len = write(handle, parmP->bufP, RecordSize);
696    if (len != RecordSize)
697    {
698      fprintf(stderr, "Wrong length in %s: %lld\n", labelP, len);
699      exit(1);
700    }
701
702    /* If next record would be outside of the area to be processed by this
703       thread, seek back to the beginning of the area */
704    nextRecordNum += 1;
705    if (nextRecordNum > maxRecordNum)
706    {
707      nextRecordNum = parmP->restartRecordNum;
708      desiredOffset = (offset_t) (nextRecordNum * RecordSize);
709      actualOffset = lseek64(handle, desiredOffset, SEEK_SET);
710      if (actualOffset != desiredOffset)
711      {
712        fprintf(stderr, "Error lseek64'ing to %lld in %s\n",
713                desiredOffset, FileNameP);
714        exit(1);
715      }
716    }
717    nRecordsLeft -= 1;
718  }
719}
720
721
722/* Do a series of strided accesses to a file that has already been
723   opened, using AIO interface */
724void DoStridedAIOTest(int handle, struct ThreadParm * parmP, const char* labelP)
725{
726#ifndef NO_AIO
727  Int64 strideRecords;
728  Int64 nextRecordNum;
729  Int64 maxRecordNum;
730  Int64 nRecordsLeft;
731  int len;
732  int i, rc;
733  char *curbufP;
734  int OpIndex, PrefetchIndex;
735  struct aiocb *accP;
736
737  /* Allocate space to describe a batch of accesses */
738  accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb));
739  if (accP == NULL)
740  {
741    fprintf(stderr, "Cannot allocate AIO descriptors\n");
742    exit(1);
743  }
744
745  /* Set pointers to the buffers to be read into */
746  curbufP = parmP->bufP;
747  for (i = 0; i < AIODepth; i++)
748  {
749    accP[i].aio_buf = curbufP;
750    accP[i].aio_nbytes = RecordSize;
751    /*accP[i].aio_offset = 0;     set later */
752    curbufP += RecordSize;
753#ifdef GPFS_AIX
754    accP[i].aio_whence = SEEK_SET;
755    accP[i].aio_flag = 0;        /* not AIO_SIGNAL */
756    accP[i].aio_handle = 0;
757#else
758    accP[i].aio_fildes = handle;
759    accP[i].aio_lio_opcode = 0;
760    accP[i].aio_reqprio = 0;
761    accP[i].aio_sigevent.sigev_notify = SIGEV_NONE;
762#endif
763  }
764
765  /* Read or write records according to the strided pattern.  If the end of
766     the file is reached, seek back to the start record. */
767  if (Stride != 0)
768    strideRecords = Stride / RecordSize;
769  else
770    strideRecords = NThreadsPerProcess * NProcs;
771  nextRecordNum = parmP->startRecordNum;
772
773  maxRecordNum = parmP->maxRecordNum;
774  nRecordsLeft = parmP->nRecords;
775  OpIndex = 0;
776  PrefetchIndex = 0;
777  while (nRecordsLeft > 0)
778  {
779    if (PrefetchIndex >= AIODepth)
780    {
781      CONST struct aiocb *accPP[] = { &accP[OpIndex] };
782      /* All prefetches done, wait for next buffer before starting next
783         prefetch */
784      do
785      {
786        rc = AIO_SUSPEND(1, accPP);
787        if (rc < 0 && errno != EINTR)
788        {
789          fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex);
790          exit(1);
791        }
792      } while (rc != 0);
793      len = AIO_RETURN(accP[OpIndex]);
794      if (len < 0)
795      {
796        fprintf(stderr, "Error %d from AIO %d\n",
797                AIO_ERROR(accP[OpIndex]), OpIndex);
798        exit(1);
799      }
800      if (len != RecordSize)
801      {
802        fprintf(stderr, "AIO %d moved %d bytes instead of %d\n",
803                OpIndex, len, RecordSize);
804        exit(1);
805      }
806      PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */
807    }
808
809    /* Read or write next record */
810    accP[OpIndex].aio_offset = (offset_t)nextRecordNum * (offset_t)RecordSize;
811    if (Verbosity >= 2)
812      printf(" th %d: %s record %lld\n",
813             parmP->threadIndex, labelP, nextRecordNum);
814    /* Start the next read or write */
815    do
816    {
817      if (TestType == ReadTest)
818        rc = AIO_READ(handle, &accP[OpIndex]);
819      else
820        rc = AIO_WRITE(handle, &accP[OpIndex]);
821      if (rc < 0)
822      {
823        if (errno != EAGAIN)
824        {
825          fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex);
826          exit(1);
827        }
828      }
829    } while (rc != 0);
830
831    /* If next record would be outside of the area to be processed by this
832       thread, go back to the beginning of the area */
833    nextRecordNum += strideRecords;
834    if (nextRecordNum > maxRecordNum)
835      nextRecordNum = parmP->restartRecordNum;
836    nRecordsLeft -= 1;
837    OpIndex++;
838    if (OpIndex >= AIODepth) OpIndex = 0;
839    PrefetchIndex++;
840  }
841#endif /* ! NO_AIO */
842}
843
844
845/* Do a series of sequential accesses to a file that has already been
846   opened, using AIO interface */
847void DoSeqAIOTest(int handle, struct ThreadParm * parmP, const char* labelP)
848{
849#ifndef NO_AIO
850  Int64 nextRecordNum;
851  Int64 maxRecordNum;
852  Int64 nRecordsLeft;
853  int len;
854  int i, rc;
855  char *curbufP;
856  int OpIndex, PrefetchIndex;
857  struct aiocb *accP;
858
859  /* Allocate space to describe a batch of accesses */
860  accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb));
861  if (accP == NULL)
862  {
863    fprintf(stderr, "Cannot allocate AIO descriptors\n");
864    exit(1);
865  }
866
867  /* Set pointers to the buffers to be read into */
868  curbufP = parmP->bufP;
869  for (i = 0; i < AIODepth; i++)
870  {
871    accP[i].aio_buf = curbufP;
872    accP[i].aio_nbytes = RecordSize;
873    /*accP[i].aio_offset = 0;     set later */
874    curbufP += RecordSize;
875#ifdef GPFS_AIX
876    accP[i].aio_whence = SEEK_SET;
877    accP[i].aio_flag = 0;        /* not AIO_SIGNAL */
878    accP[i].aio_handle = 0;
879#else
880    accP[i].aio_fildes = handle;
881    accP[i].aio_lio_opcode = 0;
882    accP[i].aio_reqprio = 0;
883    accP[i].aio_sigevent.sigev_notify = SIGEV_NONE;
884#endif
885  }
886
887  /* Seek to the beginning of the area that this thread should access */
888  nextRecordNum = parmP->startRecordNum;
889
890  /* Read or write records sequentially.  If the end of the area to be
891     accessed by this thread is reached, seek back to the beginning of the
892     area. */
893  maxRecordNum = parmP->maxRecordNum;
894  nRecordsLeft = parmP->nRecords;
895  OpIndex = 0;
896  PrefetchIndex = 0;
897  while (nRecordsLeft > 0)
898  {
899    if (PrefetchIndex >= AIODepth)
900    {
901      CONST struct aiocb *accPP[] = { &accP[OpIndex] };
902      /* All prefetches done, wait for next buffer before starting next
903         prefetch */
904      do
905      {
906        rc = AIO_SUSPEND(1, accPP);
907        if (rc < 0 && errno != EINTR)
908        {
909          fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex);
910          exit(1);
911        }
912      } while (rc != 0);
913      len = AIO_RETURN(accP[OpIndex]);
914      if (len < 0)
915      {
916        fprintf(stderr, "Error %d from AIO %d\n",
917                AIO_ERROR(accP[OpIndex]), OpIndex);
918        exit(1);
919      }
920      if (len != RecordSize)
921      {
922        fprintf(stderr, "AIO %d moved %d bytes instead of %d\n",
923                OpIndex, len, RecordSize);
924        exit(1);
925      }
926      PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */
927    }
928
929    /* Read or write next record */
930    accP[OpIndex].aio_offset = (offset_t)nextRecordNum * (offset_t)RecordSize;
931    if (Verbosity >= 2)
932      printf(" th %d: %s record %lld\n",
933             parmP->threadIndex, labelP, nextRecordNum);
934    /* Start the next read or write */
935    do
936    {
937      if (TestType == ReadTest)
938        rc = AIO_READ(handle, &accP[OpIndex]);
939      else
940        rc = AIO_WRITE(handle, &accP[OpIndex]);
941      if (rc < 0)
942      {
943        if (errno != EAGAIN)
944        {
945          fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex);
946          exit(1);
947        }
948      }
949    } while (rc != 0);
950
951    /* If next record would be outside of the area to be processed by this
952       thread, seek back to the beginning of the area */
953    nextRecordNum += 1;
954    if (nextRecordNum > maxRecordNum)
955      nextRecordNum = parmP->restartRecordNum;
956    nRecordsLeft -= 1;
957    OpIndex++;
958    if (OpIndex >= AIODepth) OpIndex = 0;
959    PrefetchIndex++;
960  }
961#endif /* ! NO_AIO */
962}
963
964
965/* Do a series of random accesses to a file that has already been opened.
966   Issue a AIO calls to prefetch blocks before they are needed. */
967void DoRandAIOTest(int handle, struct ThreadParm * parmP, const char* labelP)
968{
969#ifndef NO_AIO
970  timebasestruct_t time;
971  uint seed;
972  struct random_data rState;
973  char randState[256];
974  char* randP;
975  Int64 nRecordsLeft;
976  RANDRET_T randRet;
977  Int64 nextRecordNum;
978  int i, rc;
979  int len;
980  char *curbufP;
981  int OpIndex, PrefetchIndex;
982  struct aiocb *accP;
983
984  /* Allocate space to describe a batch of random accesses */
985  accP = (struct aiocb *)malloc(AIODepth*sizeof(struct aiocb));
986  if (accP == NULL)
987  {
988    fprintf(stderr, "Cannot allocate AIO descriptors\n");
989    exit(1);
990  }
991
992  /* Set pointers to the buffers to be read into */
993  curbufP = parmP->bufP;
994  for (i = 0; i < AIODepth; i++)
995  {
996    accP[i].aio_buf = curbufP;
997    accP[i].aio_nbytes = RecordSize;
998    /*accP[i].aio_offset = 0;     set later */
999    curbufP += RecordSize;
1000#ifdef GPFS_AIX
1001    accP[i].aio_whence = SEEK_SET;
1002    accP[i].aio_flag = 0;        /* not AIO_SIGNAL */
1003    accP[i].aio_handle = 0;
1004#else
1005    accP[i].aio_fildes = handle;
1006    accP[i].aio_lio_opcode = 0;
1007    accP[i].aio_reqprio = 0;
1008    accP[i].aio_sigevent.sigev_notify = SIGEV_NONE;
1009#endif
1010  }
1011
1012  /* Set the seed of the random number generator so each thread accesses
1013     a different sequence of records */
1014  GetTOD(&time);
1015  seed = (time.tb_high ^ time.tb_low) + (parmP->threadIndex+1)*getpid();
1016  memset(&rState, 0, sizeof(rState));
1017
1018  INITSTATE_R(seed, randState, sizeof(randState), &randP, &rState);
1019  SETSTATE_R(randState, &randP, &rState);
1020
1021  /* Read or write random records */
1022  OpIndex = 0;
1023  PrefetchIndex = 0;
1024  nRecordsLeft = parmP->nRecords;
1025  while (nRecordsLeft > 0)
1026  {
1027    if (PrefetchIndex >= AIODepth)
1028    {
1029      CONST struct aiocb *accPP[] = { &accP[OpIndex] };
1030      /* All prefetches done, wait for next buffer before starting next
1031         prefetch */
1032      do
1033      {
1034        rc = AIO_SUSPEND(1, accPP);
1035        if (rc < 0 && errno != EINTR)
1036        {
1037          fprintf(stderr, "Error %d from aio_suspend %d\n", errno, OpIndex);
1038          exit(1);
1039        }
1040      } while (rc != 0);
1041      len = AIO_RETURN(accP[OpIndex]);
1042      if (len < 0)
1043      {
1044        fprintf(stderr, "Error %d from AIO %d\n",
1045                AIO_ERROR(accP[OpIndex]), OpIndex);
1046        exit(1);
1047      }
1048      if (len != RecordSize)
1049      {
1050        fprintf(stderr, "AIO %d moved %d bytes instead of %d\n",
1051                OpIndex, len, RecordSize);
1052        exit(1);
1053      }
1054      PrefetchIndex = AIODepth; /* don't overflow PrefetchIndex */
1055    }
1056
1057    /* Generate the next record number */
1058    RANDOM_R(&randRet, &rState);
1059    nextRecordNum = randRet % RecordsInFile;
1060    accP[OpIndex].aio_offset = (Int64)nextRecordNum * (Int64)RecordSize;
1061
1062    /* Start the next read or write */
1063    do
1064    {
1065      if (TestType == ReadTest)
1066        rc = AIO_READ(handle, &accP[OpIndex]);
1067      else
1068        rc = AIO_WRITE(handle, &accP[OpIndex]);
1069      if (rc < 0)
1070      {
1071        if (errno != EAGAIN)
1072        {
1073          fprintf(stderr, "Error %d from aio_read/write %d\n", errno, OpIndex);
1074          exit(1);
1075        }
1076      }
1077    } while (rc != 0);
1078
1079    /* Decrement number of records remaining and loop */
1080    nRecordsLeft--;
1081    OpIndex++;
1082    if (OpIndex >= AIODepth) OpIndex = 0;
1083    PrefetchIndex++;
1084  }
1085
1086  /* Clean up aiocbs */
1087  free(accP);
1088#endif /* ! NO_AIO */
1089}
1090
1091
1092/* Start up data shipping.  The second parameter is the total number of
1093   open instances on all nodes that will be operating on the file.  Must be
1094   called for every such instance with the same value of nInsts. */
1095void StartDataShipping(int handle, int nInsts)
1096{
1097  struct
1098  {
1099    gpfsFcntlHeader_t hdr;
1100    gpfsDataShipStart_t start;
1101  } dsStart;
1102  int rc;
1103
1104  dsStart.hdr.totalLength = sizeof(dsStart);
1105  dsStart.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
1106  dsStart.hdr.fcntlReserved = 0;
1107  dsStart.start.structLen = sizeof(gpfsDataShipStart_t);
1108  dsStart.start.structType = GPFS_DATA_SHIP_START;
1109  dsStart.start.numInstances = nInsts;
1110  dsStart.start.reserved = 0;
1111  rc = gpfs_fcntl(handle, &dsStart);
1112  if (rc != 0)
1113  {
1114    fprintf(stderr, "gpfs_fcntl DS start directive failed. "
1115              "errno=%d errorOffset=%d\n", errno, dsStart.hdr.errorOffset);
1116    exit(1);
1117  }
1118}
1119
1120
1121/* Shut down data shipping.  Must be called for every handle for which
1122   StartDataShipping was called. */
1123void StopDataShipping(int handle)
1124{
1125  struct
1126  {
1127    gpfsFcntlHeader_t hdr;
1128    gpfsDataShipStop_t stop;
1129  } dsStop;
1130  int rc;
1131
1132  dsStop.hdr.totalLength = sizeof(dsStop);
1133  dsStop.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
1134  dsStop.hdr.fcntlReserved = 0;
1135  dsStop.stop.structLen = sizeof(dsStop.stop);
1136  dsStop.stop.structType = GPFS_DATA_SHIP_STOP;
1137  rc = gpfs_fcntl(handle, &dsStop);
1138  if (rc != 0)
1139    printf("gpfs_fcntl DS stop directive failed. errno=%d errorOffset=%d\n",
1140           errno, dsStop.hdr.errorOffset);
1141}
1142
1143
1144/* Body of one worker thread.  Parameter is the local thread index. */
1145void* TestThreadBody(void* parm)
1146{
1147  long th = (long)parm;
1148  struct ThreadParm * parmP= &ParmArray[th];
1149  int openFlags;
1150  timebasestruct_t afterOpenTime, beforeCloseTime;
1151  int handle;
1152  char* labelP;
1153  int rc;
1154
1155  /* Dump thread info if verbose tracing enabled */
1156  if (Verbosity >= 1)
1157    printf("%d: th %d starting startRecordNum %lld restartRecordNum %lld maxRecordNum %lld nRecords %lld\n",
1158           ProcNum, parmP->threadIndex, parmP->startRecordNum,
1159           parmP->restartRecordNum, parmP->maxRecordNum, parmP->nRecords);
1160
1161  /* Open the file with the appropriate flags */
1162  switch (TestType)
1163  {
1164    case CreateTest:
1165      if (ProcNum == 0)
1166        openFlags = O_CREAT | O_WRONLY;
1167      else
1168        openFlags = O_WRONLY;
1169      break;
1170
1171    case WriteTest:
1172      openFlags = O_WRONLY;
1173      break;
1174
1175    case ReadTest:
1176      openFlags = O_RDONLY;
1177      break;
1178
1179    default:
1180      openFlags = 0;
1181      break;
1182  }
1183  if (UseDirectIO)
1184    openFlags |= O_DIRECT;
1185  if (UseOsync)
1186    openFlags |= O_SYNC;
1187#ifdef MULTI_NODE
1188  if (TestType == CreateTest)
1189  {
1190    /* Create file on process 0 node first, then open on the rest */
1191    if (ProcNum == 0)
1192    {
1193      handle = open(FileNameP, openFlags, 0666);
1194      if (DoFreeAll)
1195        FreeAllRanges(handle);
1196    }
1197    rc = MPI_Barrier(MPI_COMM_WORLD);
1198    CheckRC(rc, "start MPI_Barrier");
1199    if (ProcNum != 0)
1200      handle = open(FileNameP, openFlags, 0666);
1201  }
1202  else
1203    handle = open(FileNameP, openFlags, 0666);
1204#else
1205  handle = open(FileNameP, openFlags, 0666);
1206#endif
1207  if (handle == -1)
1208  {
1209    fprintf(stderr, "Could not open file %s, errno=%d\n",
1210            FileNameP, errno);
1211    exit(1);
1212  }
1213
1214  /* Set up data shipping if necessary */
1215  if (UseDataShipping)
1216    StartDataShipping(handle, NThreadsPerProcess * NProcs);
1217
1218  /* Capture start timestamp for transfers by this thread */
1219  GetTOD(&afterOpenTime);
1220
1221  /* Read or write the requested number of records according to the correct
1222     access pattern */
1223  if (TestType == ReadTest)
1224    labelP = "read";
1225  else
1226    labelP = "write";
1227  switch (TestPattern)
1228  {
1229    case RandPattern:
1230      if (UseAsyncIO)
1231        DoRandAIOTest(handle, parmP, labelP);
1232      else
1233        DoRandTest(handle, parmP, labelP);
1234      break;
1235
1236    case RandPatternWithHint:
1237      DoRandTestWithHint(handle, parmP, labelP);
1238      break;
1239
1240    case StridedPattern:
1241      if (UseAsyncIO)
1242        DoStridedAIOTest(handle, parmP, labelP);
1243      else
1244        DoStridedTest(handle, parmP, labelP);
1245      break;
1246
1247    case SeqPattern:
1248      if (UseAsyncIO)
1249        DoSeqAIOTest(handle, parmP, labelP);
1250      else
1251        DoSeqTest(handle, parmP, labelP);
1252      break;
1253
1254    default:
1255      break;
1256  }
1257
1258  /* Timestamp end of transfers and remember how long the thread ran */
1259  GetTOD(&beforeCloseTime);
1260  parmP->xferInterval = deltaTime(&afterOpenTime, &beforeCloseTime);
1261
1262  /* Shut down data shipping if it was in effect */
1263  if (UseDataShipping)
1264    StopDataShipping(handle);
1265
1266  /* Close the file and print progress indicator */
1267  rc = close(handle);
1268  CheckRC(rc, "closing file");
1269  if (Verbosity >= 1)
1270    printf("%d: th %d finished, did all transfers in %.2f seconds\n",
1271           ProcNum, parmP->threadIndex, parmP->xferInterval);
1272  pthread_exit(NULL);
1273}
1274
1275
1276/* Invalidate all cached data held on behalf of a file on this node. */
1277void InvalidateFileCache(char* invFileNameP)
1278{
1279  int handle;
1280  struct
1281  {
1282    gpfsFcntlHeader_t hdr;
1283    gpfsClearFileCache_t inv;
1284  } invCacheHint;
1285  int rc;
1286
1287  /* Open the file.  If the open fails, the file cannot be cached. */
1288  handle = open(invFileNameP, O_RDONLY, 0);
1289  if (handle == -1)
1290    return;
1291
1292  /* Issue the invalidate hint */
1293  invCacheHint.hdr.totalLength = sizeof(invCacheHint);
1294  invCacheHint.hdr.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
1295  invCacheHint.hdr.fcntlReserved = 0;
1296  invCacheHint.inv.structLen = sizeof(gpfsClearFileCache_t);
1297  invCacheHint.inv.structType = GPFS_CLEAR_FILE_CACHE;
1298  rc = gpfs_fcntl(handle, &invCacheHint);
1299  if (rc != 0)
1300  {
1301    fprintf(stderr, "gpfs_fcntl clear cache hint failed for file '%s'. "
1302              "errno=%d errorOffset=%d\n",
1303            invFileNameP, errno, invCacheHint.hdr.errorOffset);
1304    exit(1);
1305  }
1306
1307  /* Close the file */
1308  rc = close(handle);
1309  if (rc == -1)
1310  {
1311    fprintf(stderr, "Could not close file '%s' after flushing file cache, errno=%d\n",
1312            invFileNameP, errno);
1313    exit(1);
1314  }
1315}
1316
1317
1318/* This routine is called at exit to clean up any shared memory segments
1319   that might have been created if the UseSharedMem option was selected. */
1320static void cleanupSegs()
1321{
1322  int th;
1323  for (th=0; th<NThreadsPerProcess; th++)
1324    if (ParmArray[th].shmid >= 0)
1325    {
1326      shmdt(ParmArray[th].origbufP);
1327      shmctl(ParmArray[th].shmid, IPC_RMID, NULL);
1328    }
1329}
1330
1331
1332/* Main routine */
1333int main(int argc, char *argvPP[])
1334{
1335  int i, j, rc, len;
1336  struct stat statBuf;
1337  int handle;
1338  long th;
1339  Int64 baseRecordsPerThreadToXfer;
1340  Int64 threadsWithExtraRecordToXfer;
1341  Int64 baseRecordsPerThreadInFile;
1342  Int64 threadsWithExtraRecordInFile;
1343  int gblThreadIndex;
1344  char* p;
1345  char buf1[32], buf2[32], buf3[32];
1346  pthread_t threadIds[MAX_THREADS];
1347  pthread_attr_t threadAttr;
1348  timebasestruct_t startTime, endTime;
1349  double totalSeconds;
1350  double threadTimes[MAX_THREADS];
1351  double* allXferTimesP = NULL;
1352  double totalXferTimes;
1353  double threadUtil;
1354  double dataRate;
1355  double* dP;
1356
1357  /* Initialize MPI communication */
1358#ifdef MULTI_NODE
1359  MPI_Init(&argc, &argvPP);
1360  MPI_Comm_rank(MPI_COMM_WORLD, &ProcNum);
1361  MPI_Comm_size(MPI_COMM_WORLD, &NProcs);
1362#else
1363  ProcNum = 0;
1364  NProcs = 1;
1365#endif
1366
1367  /* Parse arguments */
1368  for (i=1; i<argc; i++)
1369  {
1370    if (strcmp(argvPP[i], "-r") == 0  &&  i+1 < argc)
1371    {
1372      i += 1;
1373      RecordSize = GetNum(argvPP[i]);
1374    }
1375    else if (strcmp(argvPP[i], "-n") == 0  &&  i+1<argc)
1376    {
1377      i += 1;
1378      BytesToTransfer = GetNum(argvPP[i]);
1379    }
1380    else if (strcmp(argvPP[i], "-s") == 0  &&  i+1<argc)
1381    {
1382      i += 1;
1383      Stride = GetNum(argvPP[i]);
1384    }
1385    else if (strcmp(argvPP[i], "-th") == 0  &&  i+1<argc)
1386    {
1387      i += 1;
1388      NThreadsPerProcess = GetNum(argvPP[i]);
1389    }
1390    else if (strcmp(argvPP[i], "create") == 0)
1391      TestType = CreateTest;
1392    else if (strcmp(argvPP[i], "write") == 0)
1393      TestType = WriteTest;
1394    else if (strcmp(argvPP[i], "read") == 0)
1395      TestType = ReadTest;
1396    else if (strcmp(argvPP[i], "uncache") == 0)
1397      TestType = UncacheTest;
1398    else if (strcmp(argvPP[i], "rand") == 0)
1399      TestPattern = RandPattern;
1400    else if (strcmp(argvPP[i], "randhint") == 0)
1401      TestPattern = RandPatternWithHint;
1402    else if (strcmp(argvPP[i], "strided") == 0)
1403      TestPattern = StridedPattern;
1404    else if (strcmp(argvPP[i], "seq") == 0)
1405      TestPattern = SeqPattern;
1406    else if (strcmp(argvPP[i], "-nolabels") == 0)
1407      LabelledOutput = false;
1408    else if (strcmp(argvPP[i], "-noinv") == 0)
1409      DoInvalidate = false;
1410    else if (strcmp(argvPP[i], "-ds") == 0)
1411      UseDataShipping = true;
1412    else if (strcmp(argvPP[i], "-aio") == 0)
1413    {
1414      i += 1;
1415#ifdef NO_AIO
1416      fprintf(stderr, "aio option ignored.\n");
1417#else
1418      AIODepth = GetNum(argvPP[i]);
1419      UseAsyncIO = true;
1420#endif /* ! NO_AIO */
1421    }
1422    else if (strcmp(argvPP[i], "-dio") == 0)
1423      UseDirectIO = true;
1424    else if (strcmp(argvPP[i], "-shm") == 0)
1425      UseSharedMem = true;
1426    else if (strcmp(argvPP[i], "-reltoken") == 0)
1427      DoFreeAll = true;
1428    else if (strcmp(argvPP[i], "-fsync") == 0)
1429      DoFsync = true;
1430    else if (strcmp(argvPP[i], "-nocycle") == 0)
1431      CycleData = false;
1432    else if (strcmp(argvPP[i], "-tTimes") == 0)
1433      TaskTimes = true;
1434    else if (strcmp(argvPP[i], "-osync") == 0)
1435      UseOsync = true;
1436    else if (strcmp(argvPP[i], "-v") == 0)
1437      Verbosity = 1;
1438    else if (strcmp(argvPP[i], "-V") == 0)
1439      Verbosity = 2;
1440    else
1441    {
1442      if (FileNameP == NULL)
1443        FileNameP = argvPP[i];
1444      else
1445      {
1446        if (ProcNum == 0)
1447          fprintf(stderr, "File name '%s' already specified.\n", FileNameP);
1448        exit(1);
1449      }
1450    }
1451  }
1452
1453  /* Validate arguments */
1454  if (FileNameP == NULL)
1455  {
1456    if (ProcNum == 0)
1457      fprintf(stderr, "No file name given\n");
1458    Usage();
1459  }
1460
1461  /* Stat the file to compute its size.  If the file will be created by
1462     this test, use the size given by the -n parameter. */
1463  rc = stat(FileNameP, &statBuf);
1464  if (rc == -1)
1465  {
1466    if (TestType == CreateTest)
1467    {
1468      statBuf.st_blksize = RecordSize;
1469      statBuf.st_size = BytesToTransfer;
1470    }
1471    else
1472    {
1473      if (ProcNum == 0)
1474        fprintf(stderr, "Could not stat file '%s', errno=%d\n",
1475                FileNameP, errno);
1476      exit(1);
1477    }
1478  }
1479  if (UseAsyncIO)
1480  {
1481    if (TestPattern==RandPatternWithHint)
1482    {
1483      fprintf(stderr, "Cannot use -aio with randhint\n");
1484      exit(1);
1485    }
1486    if (NThreadsPerProcess > 1)
1487    {
1488      fprintf(stderr, "Multiple threads per process cannot be used with -aio\n");
1489      exit(1);
1490    }
1491    if (AIODepth <= 0 || AIODepth > 1000)
1492    {
1493      fprintf(stderr, "-aio depth must be greater than 0 and less than 1000\n");
1494      exit(1);
1495    }
1496  }
1497
1498  /* If the file already exists, issue the GPFS hint that flushes and
1499     invalidates all cached data belonging to the file so that each
1500     test begins in the same state */
1501  if (rc == 0  &&  (DoInvalidate || TestType == UncacheTest))
1502  {
1503    InvalidateFileCache(FileNameP);
1504    if (TestType == UncacheTest)
1505    {
1506      /* Nothing else to do.  Wait until all processes have reached this
1507         point, then exit */
1508#ifdef MULTI_NODE
1509      rc = MPI_Barrier(MPI_COMM_WORLD);
1510      CheckRC(rc, "finish MPI_Barrier");
1511#endif
1512      if (ProcNum == 0)
1513        printf("uncache %s\n", FileNameP);
1514#ifdef MULTI_NODE
1515      MPI_Finalize();
1516#endif
1517      return 0;
1518    }
1519  }
1520
1521  /* Continue validating arguments after cache invalidation */
1522  if (TestType == NoTest)
1523  {
1524    if (ProcNum == 0)
1525      fprintf(stderr, "No test type given\n");
1526    Usage();
1527  }
1528  if (TestPattern == NoPattern)
1529  {
1530    if (ProcNum == 0)
1531      fprintf(stderr, "No access pattern given\n");
1532    Usage();
1533  }
1534  if (Stride != 0  &&
1535      TestPattern != StridedPattern)
1536  {
1537    if (ProcNum == 0)
1538      fprintf(stderr, "Cannot specify -s unless access pattern is strided\n");
1539    Usage();
1540  }
1541  if (NThreadsPerProcess <= 0  ||  NThreadsPerProcess > MAX_THREADS)
1542  {
1543    if (ProcNum == 0)
1544      fprintf(stderr, "Invalid number of threads\n");
1545    Usage();
1546  }
1547
1548  /* If record size or number of bytes to transfer are not known, use the
1549     information from the stat */
1550  if (RecordSize == -1)
1551    RecordSize = statBuf.st_blksize;
1552  if (BytesToTransfer == -1)
1553    BytesToTransfer = (Int64)statBuf.st_size;
1554
1555  /* Make sure that -r and -n are known */
1556  if (RecordSize == -1)
1557  {
1558    if (ProcNum == 0)
1559      fprintf(stderr, "Record size not known; use -r option\n");
1560    exit(1);
1561  }
1562  if (BytesToTransfer == -1)
1563  {
1564    if (ProcNum == 0)
1565      fprintf(stderr, "Bytes to transfer not known; use -n option\n");
1566    exit(1);
1567  }
1568
1569  /* Make sure -s is legal */
1570  if (Stride != 0 &&
1571      (Stride%RecordSize) != 0)
1572  {
1573    if (ProcNum == 0)
1574      fprintf(stderr, "Stride must be a multiple of record size\n");
1575    exit(1);
1576  }
1577
1578  /* Compute number of records to read and write as well as number of
1579     records in the file */
1580  RecordsInFile = (Int64)statBuf.st_size / RecordSize;
1581  RecordsToTransfer = BytesToTransfer / RecordSize;
1582  BytesToTransfer = RecordsToTransfer * RecordSize;
1583
1584  /* Make sure the file is big enough to allow non-overlapping accesses */
1585  if (NThreadsPerProcess*NProcs > RecordsInFile  &&
1586      (TestPattern==StridedPattern  ||  TestPattern==SeqPattern))
1587  {
1588    if (ProcNum == 0)
1589      fprintf(stderr, "File too small to allow non-overlapping accesses\n");
1590    exit(1);
1591  }
1592
1593  /* If using shared memory, register a routine to clean up segments on exit */
1594  if (UseSharedMem)
1595  {
1596    for (th=0; th<NThreadsPerProcess; th++)
1597      ParmArray[th].shmid = -1;
1598
1599    if (atexit(cleanupSegs) < 0)
1600    {
1601      fprintf(stderr, "Cannot register exit routine\n");
1602      exit(1);
1603    }
1604  }
1605
1606  /* Compute parameters for each worker thread, allocate each thread a
1607     buffer, and touch all pages of the buffer to avoid page faults after we
1608     begin timing.  The calculation of which records are processed by each
1609     thread is complicated by the possibility that the number of records in
1610     the file and/or the number of records to be processed may not be an
1611     integral multiple of the total number of threads.
1612       For example, suppose there are 2 threads on each of 3 nodes, there
1613     are a total of 14 records in the file, and we wish to process a total
1614     of 17 records.  Using a strided access pattern, the records to be
1615     processed by each thread are as follows:
1616       proc 0 thread 0: 0  6 12
1617       proc 0 thread 1: 1  7 13
1618       proc 1 thread 0: 2  8  2
1619       proc 1 thread 1: 3  9  3
1620       proc 2 thread 0: 4 10  4
1621       proc 2 thread 1: 5 11
1622     Under the same assumptions, a sequential access pattern would do the
1623     following accesses:
1624       proc 0 thread 0:  0  1  2
1625       proc 0 thread 1:  3  4  5
1626       proc 1 thread 0:  6  7  6
1627       proc 1 thread 1:  8  9  8
1628       proc 2 thread 0: 10 11 10
1629       proc 2 thread 1: 12 13
1630   */
1631  baseRecordsPerThreadToXfer = RecordsToTransfer / (NThreadsPerProcess*NProcs);
1632  threadsWithExtraRecordToXfer = RecordsToTransfer -
1633                           (baseRecordsPerThreadToXfer*NThreadsPerProcess*NProcs);
1634  baseRecordsPerThreadInFile = RecordsInFile / (NThreadsPerProcess*NProcs);
1635  threadsWithExtraRecordInFile = RecordsInFile -
1636                           (baseRecordsPerThreadInFile*NThreadsPerProcess*NProcs);
1637  for (th=0; th<NThreadsPerProcess; th++)
1638  {
1639    gblThreadIndex = ProcNum*NThreadsPerProcess + th;
1640    ParmArray[th].nRecords = baseRecordsPerThreadToXfer;
1641    if (gblThreadIndex < threadsWithExtraRecordToXfer)
1642      ParmArray[th].nRecords += 1;
1643    if (TestPattern == StridedPattern)
1644    {
1645      ParmArray[th].startRecordNum = gblThreadIndex;
1646      ParmArray[th].restartRecordNum = gblThreadIndex;
1647      ParmArray[th].maxRecordNum = RecordsInFile - 1;
1648    }
1649    else if (TestPattern == SeqPattern)
1650    {
1651      if (gblThreadIndex < threadsWithExtraRecordInFile)
1652        ParmArray[th].startRecordNum = gblThreadIndex *
1653                                       (baseRecordsPerThreadInFile+1);
1654      else
1655        ParmArray[th].startRecordNum = gblThreadIndex*baseRecordsPerThreadInFile +
1656                                       threadsWithExtraRecordInFile;
1657      if (CycleData)
1658      {
1659        ParmArray[th].restartRecordNum = ParmArray[th].startRecordNum;
1660        ParmArray[th].maxRecordNum = ParmArray[th].startRecordNum +
1661                                     baseRecordsPerThreadInFile - 1;
1662        if (gblThreadIndex < threadsWithExtraRecordInFile)
1663          ParmArray[th].maxRecordNum += 1;
1664      }
1665      else
1666      {
1667        ParmArray[th].restartRecordNum = 0;
1668        ParmArray[th].maxRecordNum = RecordsInFile - 1;
1669      }
1670    }
1671    else  /* random */
1672    {
1673      ParmArray[th].startRecordNum = 0;
1674      ParmArray[th].restartRecordNum = 0;
1675      ParmArray[th].maxRecordNum = RecordsInFile - 1;
1676    }
1677
1678    len = RecordSize;
1679    if (UseAsyncIO)
1680      len *= AIODepth;
1681    len += PAGE_SIZE;
1682
1683    if (UseSharedMem)
1684    {
1685      int shmid = shmget(IPC_PRIVATE, len, IPC_CREAT | S_IRUSR | S_IWUSR
1686#ifdef GPFS_AIX
1687                          | SHM_LGPAGE | SHM_PIN
1688#endif
1689                         );
1690      if (shmid < 0)
1691      {
1692        fprintf(stderr, "Cannot allocate shared memory buffer for thread %d, errno %d\n",
1693                gblThreadIndex, errno);
1694        exit(1);
1695      }
1696      p = (char *)shmat(shmid, 0, 0);
1697      if (p == (char *)-1)
1698      {
1699        fprintf(stderr, "Cannot attach shared memory buffer for thread %d, errno %d\n",
1700                gblThreadIndex, errno);
1701        shmctl(shmid, IPC_RMID, NULL);
1702        exit(1);
1703      }
1704      ParmArray[th].shmid = shmid;
1705    }
1706    else
1707    {
1708      p = (char*) malloc(len);
1709      if (p == NULL)
1710      {
1711        fprintf(stderr, "Cannot allocate buffer for thread %d\n", gblThreadIndex);
1712        exit(1);
1713      }
1714    }
1715    ParmArray[th].origbufP = p;
1716    ParmArray[th].bufP = (char*)(((ulong)p + PAGE_SIZE - 1) & ~(PAGE_SIZE-1));
1717    memset(ParmArray[th].bufP, (char)gblThreadIndex, RecordSize);
1718    ParmArray[th].threadIndex = th;
1719  }
1720
1721  if (UseAsyncIO)
1722  {
1723#ifdef GPFS_LINUX
1724#ifndef NO_AIO
1725    memset(&st_aioinit, '\0', sizeof(st_aioinit));
1726    st_aioinit.aio_threads = AIODepth;
1727    st_aioinit.aio_num = AIODepth;
1728    st_aioinit.aio_idle_time = 10;
1729    aio_init(&st_aioinit);
1730#endif
1731#endif
1732  }
1733
1734  /* Print message describing test */
1735  if (LabelledOutput  &&
1736      ProcNum == 0)
1737  {
1738    printf("%s %s %s %s\n",
1739           argvPP[0],
1740           TestType==CreateTest ? "create" :
1741             TestType==WriteTest ? "write" :
1742             TestType==ReadTest ? "read" :
1743             "??",
1744           TestPattern==RandPattern ? "rand" :
1745             TestPattern==RandPatternWithHint ? "randhint" :
1746             TestPattern==StridedPattern ? "strided" :
1747             TestPattern==SeqPattern ? "seq" :
1748             "??",
1749           FileNameP);
1750    printf("  recSize %s nBytes %s fileSize %s\n",
1751           int64ToString(RecordSize, buf1),
1752           int64ToString(BytesToTransfer, buf2),
1753           int64ToString((Int64)statBuf.st_size, buf3));
1754    printf("  nProcesses %d nThreadsPerProcess %d\n",
1755           NProcs,
1756           NThreadsPerProcess);
1757    if (TestPattern == StridedPattern  &&
1758        Stride != 0)
1759      printf("  stride %lld records\n", Stride/RecordSize);
1760    printf("  file cache %sflushed before test\n",
1761           DoInvalidate ? "" : "not ");
1762    printf("  %susing data shipping\n",
1763           UseDataShipping ? "" : "not ");
1764    printf("  %susing direct I/O\n",
1765           UseDirectIO ? "" : "not ");
1766    printf("  offsets accessed will%s cycle through the same file segment\n",
1767           CycleData ? "" : " not");
1768    printf("  %susing shared memory buffer\n",
1769           UseSharedMem ? "" : "not ");
1770    if (UseOsync)
1771      printf("  using O_SYNC option\n");
1772    if (UseAsyncIO)
1773      printf("  using AIO to depth %d\n", AIODepth);
1774    printf("  %sreleasing byte-range token after open\n",
1775           DoFreeAll ? "" : "not ");
1776    if (TestType != ReadTest)
1777      printf("  %sfsync at end of test\n",
1778             DoFsync ? "" : "no ");
1779  }
1780
1781#ifdef MULTI_NODE
1782  /* Synchronize all processes before continuing */
1783  rc = MPI_Barrier(MPI_COMM_WORLD);
1784  CheckRC(rc, "start MPI_Barrier");
1785#endif
1786
1787  /* Get start time */
1788  GetTOD(&startTime);
1789
1790#ifdef MULTI_NODE
1791  /* Insure that no process does any work until after the master gets the
1792     start timestamp */
1793  rc = MPI_Barrier(MPI_COMM_WORLD);
1794  CheckRC(rc, "startTime MPI_Barrier");
1795#endif
1796
1797  /* Fork off worker threads, then wait for them all to finish */
1798  pthread_attr_init(&threadAttr);
1799  pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
1800  for (th=0; th<NThreadsPerProcess; th++)
1801  {
1802    rc = pthread_create(&threadIds[th], &threadAttr, TestThreadBody,
1803                        (void *)th);
1804    CheckRC(rc, "pthread_create");
1805  }
1806  for (th=0; th<NThreadsPerProcess; th++)
1807  {
1808    rc = pthread_join(threadIds[th], NULL);
1809    CheckRC(rc, "pthread_join");
1810  }
1811
1812#ifdef MULTI_NODE
1813  /* Synchronize all processes (wait for all threads on all nodes to
1814     finish) */
1815  rc = MPI_Barrier(MPI_COMM_WORLD);
1816  CheckRC(rc, "finish MPI_Barrier");
1817#endif
1818
1819  /* Flush the file to disk if -fsync option was given and this test may
1820     have changed the file.  On version 1.3 of GPFS and later, this only
1821     needs to be done by one process, since fsync by one node flushes the
1822     file globally.  Earlier versions of GPFS must perform the fsync on all
1823     nodes to insure that all data and metadata have been written to disk.
1824     The file has already been closed by the threads on this node, so we
1825     need to reopen the file just to do the fsync. */
1826  if (DoFsync  &&
1827      ProcNum == 0  &&
1828      TestType != ReadTest)
1829  {
1830    handle = open(FileNameP, O_WRONLY, 0666);
1831    if (handle == -1)
1832    {
1833      fprintf(stderr, "Could not open file '%s' to fsync it, errno=%d\n",
1834              FileNameP, errno);
1835      exit(1);
1836    }
1837    rc = fsync(handle);
1838    if (rc == -1)
1839    {
1840      fprintf(stderr, "Error %d from fsync\n", errno);
1841      exit(1);
1842    }
1843    rc = close(handle);
1844    if (rc == -1)
1845    {
1846      fprintf(stderr, "Could not close file '%s' after fsync, errno=%d\n",
1847              FileNameP, errno);
1848      exit(1);
1849    }
1850  }
1851#ifdef MULTI_NODE
1852  if (DoFsync  &&  TestType != ReadTest)
1853  {
1854    rc = MPI_Barrier(MPI_COMM_WORLD);
1855    CheckRC(rc, "fsync MPI_Barrier");
1856  }
1857#endif
1858
1859  /* Get stop time and compute test interval */
1860  GetTOD(&endTime);
1861  totalSeconds = deltaTime(&startTime, &endTime);
1862  if (Verbosity >= 1)
1863    printf("%d: totalSeconds %f\n", ProcNum, totalSeconds);
1864
1865  /* Thread utilization or efficiency: we would like to know if all threads
1866     took the same amount of time or if some threads finished significantly
1867     before others.  For example, suppose the test involved 4 threads and
1868     ran for a total of 10 seconds.  If all four threads required the full
1869     10 seconds to do all of their reads or writes, the utilization would be
1870     (10+10+10+10)/(4*10) = 1.000.  On the other hand, if 3 of the threads
1871     took 4 seconds each and the last one took 9 seconds, the utilization
1872     would be (4+4+4+9)/(4*10) = 0.525.  Gather the thread run time
1873     information from all processes to process 0. */
1874  for (th=0; th<NThreadsPerProcess; th++)
1875    threadTimes[th] = ParmArray[th].xferInterval;
1876  if (ProcNum == 0)
1877    allXferTimesP = (double*)malloc(NProcs*NThreadsPerProcess*sizeof(double));
1878#ifdef MULTI_NODE
1879  rc = MPI_Gather((void*)threadTimes, NThreadsPerProcess*sizeof(double), MPI_BYTE,
1880                  (void*)allXferTimesP, NThreadsPerProcess*sizeof(double), MPI_BYTE,
1881                  0, MPI_COMM_WORLD);
1882  CheckRC(rc, "MPI_Gather");
1883#else
1884  for (i=0; i<NThreadsPerProcess; i++)
1885    allXferTimesP[i] = threadTimes[i];
1886#endif
1887
1888  if (ProcNum == 0)
1889  {
1890    /* Compute the thread utilization according to the definition above */
1891    totalXferTimes = 0.0;
1892    for (i=0; i<NProcs*NThreadsPerProcess; i++)
1893      totalXferTimes += allXferTimesP[i];
1894    threadUtil = totalXferTimes / (NProcs*NThreadsPerProcess*totalSeconds);
1895
1896    /* Print detailed task and thread times */
1897    if (TaskTimes)
1898    {
1899      printf("\n    Task times\n");
1900      printf("      task   thd     time   fract\n");
1901      dP = allXferTimesP;
1902      for (i=0; i<NProcs; i++)
1903        for (j=0; j<NThreadsPerProcess; j++)
1904        {
1905          printf("     %5d %5d %8.2f   %5.3f\n",
1906                 i, j, *dP, *dP/totalSeconds);
1907          dP += 1;
1908        }
1909    }
1910
1911    /* Compute data rate and print results */
1912    dataRate = (BytesToTransfer / totalSeconds) / 1000.0;
1913    if (LabelledOutput)
1914      printf("    Data rate was %.2f Kbytes/sec, thread utilization %.3f\n",
1915             dataRate, threadUtil);
1916    else
1917      printf("%s %s %s %d %lld %lld %d %d %lld %d %d %d %d %d %d %d %d %d %.2f %.3f\n",
1918             TestType==CreateTest ? "create" :
1919               TestType==WriteTest ? "write" :
1920               TestType==ReadTest ? "read" :
1921               "??",
1922             TestPattern==RandPattern ? "rand" :
1923               TestPattern==RandPatternWithHint ? "randhint" :
1924               TestPattern==StridedPattern ? "strided" :
1925               TestPattern==SeqPattern ? "seq" :
1926               "??",
1927             FileNameP,
1928             RecordSize,
1929             BytesToTransfer,
1930             (Int64)statBuf.st_size,
1931             NProcs,
1932             NThreadsPerProcess,
1933             Stride/RecordSize,
1934             DoInvalidate,
1935             UseDataShipping,
1936             UseDirectIO,
1937             UseSharedMem,
1938             DoFsync,
1939             CycleData,
1940             DoFreeAll,
1941             AIODepth,
1942             UseOsync,
1943             dataRate,
1944             threadUtil);
1945
1946  }
1947
1948#ifdef MULTI_NODE
1949  /* Terminate MPI */
1950  MPI_Finalize();
1951#endif
1952
1953  /* Return success */
1954  return 0;
1955}
1956
1957#ifdef GPFS_LINUX
1958void
1959GetTOD(timebasestruct_t *tP)
1960{
1961  struct timeval time;
1962
1963  gettimeofday(&time, NULL);
1964  tP->tb_high = time.tv_sec;
1965  tP->tb_low = time.tv_usec * 1000;
1966}
1967#endif
Note: See TracBrowser for help on using the repository browser.