例を実行する方法
サンプル・プログラムは、マルチスレッド対応のジョブで実行する必要があります。
ジョブで複数のスレッドを許可するように ALWMLTTHD(*YES) を指定して、
SBMJOB を使用してプログラムを呼び出します。
- SBMJOB CMD(CALL MYLIB/THREADMTX) ALWMLTTHD(*YES)
- SBMJOB CMD(CALL MYLIB/THREADSEM) ALWMLTTHD(*YES)
図 1. RPG ソース・
ファイル THREADMTX
/UNDEFINE LOG_ALL_RESULTS
H THREAD(*CONCURRENT) MAIN(threadMtx)
H BNDDIR('QC2LE')
/IF DEFINED(*CRTBNDRPG)
H DFTACTGRP(*NO)
/ENDIF
H OPTION(*SRCSTMT : *NOUNREF)
/COPY QSYSINC/QRPGLESRC,PTHREAD
D NUMTHREADS C 3
D threadMtx PR EXTPGM('THREADMTX')
D mtxThread PR * EXTPROC('mtxThread')
D parm * VALUE
D handleThreads PR EXTPROC('handleThreads')
D checkResults PR EXTPROC('checkResults')
D string 1000A VARYING CONST
D val 10I 0 VALUE
D threadMsg PR EXTPROC('threadMsg')
D string 1000A VARYING CONST
D print PR EXTPROC('print')
D msg 1000A VARYING CONST
D CEETREC PR
D cel_rc_mod 10I 0 OPTIONS(*OMIT)
D user_rc 10I 0 OPTIONS(*OMIT)
D sleep PR EXTPROC(*CWIDEN:'sleep')
D secs 10I 0 VALUE
D fmtThreadId PR 17A VARYING
*-------------------------------------------------------
* Thread-scoped static variables (the STATIC keyword
* is implied because the definition is global)
*-------------------------------------------------------
D psds SDS
D pgmName 10A OVERLAY(psds : 334)
*-------------------------------------------------------
* Job-scoped static variables
*-------------------------------------------------------
* Shared data that will be protected by the mutex
D sharedData S 10I 0 INZ(0)
D STATIC(*ALLTHREAD)
D sharedData2 S 10I 0 INZ(0)
D STATIC(*ALLTHREAD)
* A mutex to control the shared data
D mutex DS LIKEDS(pthread_mutex_t)
D STATIC(*ALLTHREAD)
// Program entry procedure
P threadMtx B
/free
print ('Enter ' + pgmName);
handleThreads ();
print ('Exit ' + pgmName);
/end-free
P threadMtx E
P handleThreads B
D handleThreads PI
D thread DS LIKEDS(pthread_t)
D DIM(NUMTHREADS)
D rc S 10I 0 INZ(0)
D i S 10I 0 INZ(0)
/free
print ('"handleThreads" starting');
print ('Test using a mutex');
// Initialize the mutex
mutex = PTHREAD_MUTEX_INITIALIZER;
print ('Hold Mutex to prevent access to shared data');
rc = pthread_mutex_lock (mutex);
checkResults('pthread_mutex_lock()' : rc);
print ('Create/start threads');
for i = 1 to NUMTHREADS;
rc = pthread_create(thread(i) : *OMIT
: %paddr(mtxThread) : *NULL);
checkResults ('pthread_create()' : rc);
endfor;
print ('Wait a bit until we are "done" with the shared data');
sleep(3);
print ('Unlock shared data');
rc = pthread_mutex_unlock (mutex);
checkResults('pthread_mutex_unlock()' : rc);
print ('Wait for the threads to complete, '
+ 'and release their resources');
for i = 1 to NUMTHREADS;
rc = pthread_join (thread(i) : *OMIT);
checkResults('pthread_join( ' + %char(i) + ')' : rc);
endfor;
print ('Clean up the mutex');
rc = pthread_mutex_destroy (mutex);
print ('"handleThreads" completed');
return;
/end-free
P handleThreads E
P mtxThread B
D mtxThread PI *
D parm * VALUE
D rc S 10I 0
D
/free
threadMsg ('Entered');
rc = pthread_mutex_lock (mutex);
checkResults ('pthread_mutex_lock()' : rc);
//********** Critical Section Begin *******************
threadMsg ('Start critical section, holding lock');
// Access to shared data goes here
sharedData += 1;
sharedData2 -= 1;
threadMsg ('End critical section, release lock');
//********** Critical Section End *******************
rc = pthread_mutex_unlock (mutex);
checkResults ('pthread_mutex_unlock()' : rc);
return *NULL;
/end-free
P mtxThread E
P checkResults B EXPORT
D checkResults PI
D string 1000A VARYING CONST
D val 10I 0 VALUE
D msg S 1000A VARYING
/FREE
if val <> 0;
print (string + ' failed with ' + %char(val));
CEETREC (*OMIT : *OMIT);
else;
/if defined(LOG_ALL_RESULTS)
print (string + ' completed normally with ' + %char(val));
/endif
endif;
/END-FREE
P checkResults E
P threadMsg B EXPORT
D threadMsg PI
D string 1000A VARYING CONST
/FREE
print ('Thread(' + fmtThreadId() + ') ' + string);
/END-FREE
P threadMsg E
P print B EXPORT
D print PI
D msg 1000A VARYING CONST
D printf PR * EXTPROC('printf')
D template * VALUE OPTIONS(*STRING)
D string * VALUE OPTIONS(*STRING)
D dummy * VALUE OPTIONS(*NOPASS)
D NEWLINE C x'15'
/free
printf ('%s' + NEWLINE : msg);
/end-free
P print E
P fmtThreadId B EXPORT
D fmtThreadId PI 17A VARYING
D pthreadId DS LIKEDS(pthread_id_np_t)
D buf S 1000A
D sprintf PR * EXTPROC('sprintf')
D buf * VALUE
D template * VALUE OPTIONS(*STRING)
D num1 10U 0 VALUE
D num2 10U 0 VALUE
D dummy * OPTIONS(*NOPASS)
/FREE
pthreadId = pthread_getthreadid_np();
// get the hex form of the 2 parts of the thread-id
// in "buf", null-terminated
sprintf (%addr(buf)
: '%.8x %.8x'
: pthreadId.intId.hi
: pthreadId.intId.lo);
return %str(%addr(buf));
/END-FREE
P fmtThreadId E
図 2. セマフォーの使用を示す RPG プログラム THREADSEM
/UNDEFINE LOG_ALL_RESULTS
H THREAD(*CONCURRENT) MAIN(threadSem)
H BNDDIR('QC2LE')
/IF DEFINED(*CRTBNDRPG)
H DFTACTGRP(*NO)
/ENDIF
H OPTION(*SRCSTMT : *NOUNREF)
/COPY QSYSINC/QRPGLESRC,PTHREAD
/COPY QSYSINC/QRPGLESRC,SYSSEM
/COPY QSYSINC/QRPGLESRC,SYSSTAT
D NUMTHREADS C 3
D threadSem PR EXTPGM('THREADSEM')
D semThreadParm_t...
D DS QUALIFIED TEMPLATE
D val 10I 0
D result 10I 0
D semThread PR * EXTPROC('semThread')
D parm LIKEDS(semThreadParm_t)
D handleThreads PR EXTPROC('handleThreads')
D checkResults PR EXTPROC('checkResults')
D string 1000A VARYING CONST
D val 10I 0 VALUE
D checkResultsErrno...
D PR EXTPROC('checkResultsErrno')
D string 1000A VARYING CONST
D cond N VALUE
D threadMsg PR EXTPROC('threadMsg')
D string 1000A VARYING CONST
D print PR EXTPROC('print')
D msg 1000A VARYING CONST
D CEETREC PR
D cel_rc_mod 10I 0 OPTIONS(*OMIT)
D user_rc 10I 0 OPTIONS(*OMIT)
D sleep PR EXTPROC(*CWIDEN:'sleep')
D secs 10I 0 VALUE
D fmtThreadId PR 17A VARYING
*-------------------------------------------------------
* Thread-scoped static variables (the STATIC keyword
* is implied because the definition is global)
*-------------------------------------------------------
D psds SDS
D pgmName 10A OVERLAY(psds : 334)
*-------------------------------------------------------
* Job-scoped static variables
*-------------------------------------------------------
* Shared data that will be protected by the mutex
D sharedData S 10I 0 INZ(0)
D STATIC(*ALLTHREAD)
D sharedData2 S 10I 0 INZ(0)
D STATIC(*ALLTHREAD)
* A semaphore to control the shared data
D semaphoreId S 10I 0 STATIC(*ALLTHREAD)
* Simple lock operation. 0=which-semaphore, -1=decrement, 0=noflags
* Will be set to { 0, -1, 0} in main procedure before threads are created
D lockOperation DS LIKEDS(struct_sembuf)
D DIM(1)
D STATIC(*ALLTHREAD)
* Simple unlock operation. 0=which-semaphore, 1=increment, 0=noflags
* Will be set to { 0, 1, 0} in main procedure before threads are created
D unlockOperation...
D DS LIKEDS(struct_sembuf)
D DIM(1)
D STATIC(*ALLTHREAD)
// Program entry procedure
P threadSem B
/free
print ('Enter ' + pgmName);
handleThreads ();
print ('Exit ' + pgmName);
/end-free
P threadSem E
P handleThreads B
D handleThreads PI
D thread DS LIKEDS(pthread_t)
D DIM(NUMTHREADS)
D rc S 10I 0 INZ(0)
D i S 10I 0 INZ(0)
D parms DS LIKEDS(semThreadParm_t)
D DIM(NUMTHREADS)
/free
print ('"handleThreads" starting');
print ('Test using a semaphore');
lockOperation(1).sem_num = 0;
lockOperation(1).sem_op = -1;
lockOperation(1).sem_flg = 0;
unlockOperation(1).sem_num = 0;
unlockOperation(1).sem_op = 1;
unlockOperation(1).sem_flg = 0;
// Create a private semaphore set with 1
// semaphore that only I can use
semaphoreId = semget(IPC_PRIVATE : 1 : 0 + S_IRUSR + S_IWUSR);
checkResultsErrno ('semget' : semaphoreId >= 0);
// Set the semaphore count to 1.
// Simulate a mutex
rc = semctl(semaphoreId : 0 : CMD_SETVAL : 1);
checkResults('semctl(SETVAL)' : rc);
print ('Wait on semaphore to prevent access to shared data');
rc = semop(semaphoreId : lockOperation(1) : 1);
checkResultsErrno('main semop(lock)': rc = 0);
parms(1).val = 5;
parms(2).val = -10;
parms(3).val = 421;
print ('Create/start threads');
for i = 1 to NUMTHREADS;
rc = pthread_create(thread(i) : *OMIT
: %paddr(semThread) : %addr(parms(i)));
checkResults ('pthread_create()' : rc);
endfor;
print ('Wait a bit until we are "done" with the shared data');
sleep (3);
print ('Unlock shared data');
rc = semop (semaphoreId : unlockOperation(1) : 1);
checkResultsErrno ('main semop(unlock)' : rc = 0);
print ('Wait for the threads to complete, '
+ 'and release their resources');
for i = 1 to NUMTHREADS;
rc = pthread_join (thread(i) : *OMIT);
checkResults('pthread_join( ' + %char(i) + ')' : rc);
endfor;
print ('Clean up the semaphore');
rc = semctl(semaphoreId : 0 : IPC_RMID);
checkResults ('semctl(removeID)' : rc);
print ('Result(1) = ' + %char(parms(1).result));
print ('Result(2) = ' + %char(parms(2).result));
print ('Result(3) = ' + %char(parms(3).result));
print ('"handleThreads" completed');
return;
/end-free
P handleThreads E
P semThread B
D semThread PI *
D parm LIKEDS(semThreadParm_t)
D rc S 10I 0
D
/free
threadMsg ('Entered + parm.val = ' + %char(parm.val));
// Set the output subfields of the parameter
parm.result = parm.val * 2;
rc = semop (semaphoreId : lockOperation(1) : 1);
checkResultsErrno ('thread semop(lock)' : rc = 0);
//********** Critical Section Begin *******************
threadMsg ('Start critical section, holding semaphore');
// Access to shared data goes here
sharedData += 1;
sharedData2 -= 1;
threadMsg ('End critical section, release semaphore');
//********** Critical Section End *******************
rc = semop (semaphoreId : unlockOperation(1) : 1);
checkResultsErrno ('thread semop(unlock)' : rc = 0);
threadMsg ('Exiting');
return *NULL;
/end-free
P semThread E
P checkResults B EXPORT
D checkResults PI
D string 1000A VARYING CONST
D val 10I 0 VALUE
D msg S 1000A VARYING
/FREE
if val <> 0;
print (string + ' failed with ' + %char(val));
CEETREC (*OMIT : *OMIT);
else;
/if defined(LOG_ALL_RESULTS)
print (string + ' completed normally with ' + %char(val));
/endif
endif;
/END-FREE
P checkResults E
P checkResultsErrno...
P B
D checkResultsErrno...
D PI
D string 1000A VARYING CONST
D cond N VALUE
D getErrnoPtr PR * EXTPROC('__errno')
D errnoVal S 10I 0 based(threadErrnoPtr)
/FREE
if not cond;
threadErrnoPtr = getErrnoPtr();
print (string + ' Errno(' + %char(errnoVal) + ')');
CEETREC (*OMIT : *OMIT);
else;
/if defined(LOG_ALL_RESULTS)
print (string + ' completed normally');
/endif
endif;
/END-FREE
P checkResultsErrno...
P E
P threadMsg B EXPORT
D threadMsg PI
D string 1000A VARYING CONST
/FREE
print ('Thread(' + fmtThreadId() + ') ' + string);
/END-FREE
P threadMsg E
P print B EXPORT
D print PI
D msg 1000A VARYING CONST
D printf PR * EXTPROC('printf')
D template * VALUE OPTIONS(*STRING)
D string * VALUE OPTIONS(*STRING)
D dummy * VALUE OPTIONS(*NOPASS)
D NEWLINE C x'15'
/free
printf ('%s' + NEWLINE : msg);
/end-free
P print E
P fmtThreadId B EXPORT
D fmtThreadId PI 17A VARYING
D pthreadId DS LIKEDS(pthread_id_np_t)
D buf S 1000A
D sprintf PR * EXTPROC('sprintf')
D buf * VALUE
D template * VALUE OPTIONS(*STRING)
D num1 10U 0 VALUE
D num2 10U 0 VALUE
D dummy * OPTIONS(*NOPASS)
/FREE
pthreadId = pthread_getthreadid_np();
// get the hex form of the 2 parts of the thread-id
// in "buf", null-terminated
sprintf (%addr(buf)
: '%.8x %.8x'
: pthreadId.intId.hi
: pthreadId.intId.lo);
return %str(%addr(buf));
/END-FREE
P fmtThreadId E