|
Class: SharedQueue
Object
|
+--Collection
|
+--Queue
|
+--SharedQueue
|
+--UnlimitedSharedQueue
- Package:
- stx:libbasic2
- Category:
- Kernel-Processes
- Version:
- rev:
1.54
date: 2019/06/25 12:28:51
- user: cg
- file: SharedQueue.st directory: libbasic2
- module: stx stc-classLibrary: libbasic2
- Author:
- Claus Gittinger
SharedQueues provide a safe mechanism for processes to communicate.
They are basically Queues, with added secure access to the internals,
allowing use from multiple processes (i.e. the access methods use
critical regions to protect against confusion due to a process
switch within a modification).
Also, sharedQueues can be used for synchronization, since a reading
process will be blocked when attempting to read an empty queue, while
a writer will be blocked when attempting to write into a full queue.
For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.
Be warned:
if a reader process wants to add elements to the sharedqueue in its
read-loop, it may block, if the queue is full, leading to a deadlock.
The reason is that the sharedQueues size is fixed, and any write is blocked
if the queue is full.
For this situations, please use an UnlimitedSharedQueue, which grows in that
particular situation.
See samples in doc/coding.
SharedCollection
UnlimitedSharedQueue
Queue
Semaphore
Process
CodingExamples::SharedQueueExamples
accessing
-
remove: anElement ifAbsent: exceptionalValue
-
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue
-
removeAll
-
remove all elements in the queue; do not wait, but
synchronize access to the queue.
If the queue was full before, signal space-availability to writers.
This can be used to flush queues in multi-process applications,
when cleanup is required.
-
removeIdentical: anElement ifAbsent: exceptionalValue
-
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue
-
removeLast
-
return the last value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to
writers
accessing-internals
-
accessLock
-
return the critical access-semaphore which is used internally to synchronize access
-
readSemaphore
-
return the semaphore which is signalled when data is available
for reading.
-
readWaitWithTimeoutMs: ms
-
Return true if a timeout occurred (i.e. false, if data is available).
-
superNextPut: anObject
-
private; to allow subclasses to call the basic nextPut (w.o. synchronization)
-
superNextPutFirst: anObject
-
private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)
-
withAccessLockedDo: aBlock
-
evaluate aBlock while access via next/nextPut are blocked.
-
writeSemaphore
-
return the semaphore which is signalled when the queue has space
for writing.
accessing-reading
-
next
-
return the next value in the queue; if it its empty, wait 'til
something is put into the receiver.
When the datum has been removed, signal space-availability to writers
-
nextIfEmpty: exceptionBlock
-
return the next value in the queue;
if it is empty do not wait, but return the value of exceptionBlock.
When a datum has been removed, signal space-availability to writers
-
nextOrNil
-
return the next value in the queue;
if it is empty do not wait, but return nil.
When a datum has been removed, signal space-availability to writers
-
nextWithTimeout: secondsOrTimeDurationOrNil
-
return the next value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to writers.
Answer nil if a timeout occurs.
The argument may be a time duration or the number of seconds as integer
or float (i.e. use 0.1 for a 100ms timeout).
With zero timeout, this can be used to poll a semaphore (returning
the receiver if the semaphore is available, nil if not).
However, polling is not the intended use of semaphores, though.
If the argument is nil, wait without timeout (forever).
-
peek
-
accessing-writing
-
nextPut: anObject
-
enter anObject to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
-
nextPutFirst: anObject
-
insert anObject at the beginning of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)
enumerating
-
do: anObject
-
evaluate the argument, aBlock for each element in the queue
-
reverseDo: anObject
-
evaluate the argument, aBlock for each element in the queue
initialization
-
init: size
-
initialize the receiver for size entries
private
-
commonWriteWith: aBlock
-
common code for nextPut / nextPutFirst;
wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
|queues readers writers seqNumber accessLock accessLock2
numbersStillToReceive|
seqNumber := 1.
accessLock := Semaphore forMutualExclusion.
accessLock2 := Semaphore forMutualExclusion.
numbersStillToReceive := BooleanArray new:100000 withAll:true.
queues := (1 to:10) collect:[:i | SharedQueue new].
readers := (1 to:10) collect:[:i |
[ |num|
10000 timesRepeat:[
num := (queues at:i) next.
accessLock2 critical:[
(numbersStillToReceive at:num) ifFalse:[
self halt:(num printString , ' received twice')
] ifTrue:[
numbersStillToReceive at:num put:false.
].
].
'num printCR.'.
].
] fork
].
writers := (1 to:10) collect:[:i |
[ |num|
10000 timesRepeat:[
accessLock critical:[
num := seqNumber.
seqNumber := seqNumber + 1.
].
(queues at:i) nextPut:num.
]
] fork
].
readers do:[:aReader | aReader waitUntilTerminated].
' any left ? '.
(numbersStillToReceive includes:true) ifTrue:[
self halt:'oops - not all numbers received'
]
|
deadlock example:
here, a read process tries to write !
|queue reader writer|
queue := SharedQueue new:10.
reader :=
[
|num|
[ (num := queue next) ~~ #EOF] whileTrue:[
'here is the bad code: writing into the queue !'.
num == 30 ifTrue:[
Transcript showCR:'xxx'.
queue nextPut:'bad1'.
queue nextPut:'bad2'.
].
Transcript showCR:num.
Delay waitForSeconds:0.01.
].
] fork.
writer :=
[ |num|
1 to:60 do:[:seqNr |
queue nextPut:seqNr.
].
queue nextPut:#EOF.
] fork.
reader waitUntilTerminated.
writer waitUntilTerminated.
|
|