|
Class: SharedQueue
Object
|
+--Collection
|
+--Queue
|
+--SharedQueue
|
+--UnlimitedSharedQueue
- Package:
- stx:libbasic2
- Category:
- Kernel-Processes
- Version:
- rev:
1.74
date: 2024/01/22 11:13:58
- user: stefan
- file: SharedQueue.st directory: libbasic2
- module: stx stc-classLibrary: libbasic2
SharedQueues provide a safe mechanism for processes to communicate.
They are basically Queues, with added secure access to the internals,
allowing use from multiple processes (i.e. the access methods use
critical regions to protect against confusion due to a process
switch within a modification).
Also, sharedQueues can be used for synchronization, since a reading
process will be blocked when attempting to read an empty queue, while
a writer will be blocked when attempting to write into a full queue.
For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.
Be warned:
if a reader process wants to add elements to the sharedqueue in its
read-loop, it may block, if the queue is full, leading to a deadlock.
The reason is that the sharedQueues size is fixed, and any write is blocked
if the queue is full.
For this situations, please use an UnlimitedSharedQueue, which grows in that
particular situation.
See samples in doc/coding.
copyrightCOPYRIGHT (c) 1993 by Claus Gittinger
All Rights Reserved
This software is furnished under a license and may be used
only in accordance with the terms of that license and with the
inclusion of the above copyright notice. This software may not
be provided or otherwise made available to, or used by, any
other person. No title to or ownership of the software is
hereby transferred.
accessing
-
removeAll
-
remove all elements from the queue;
do not wait, but synchronize access to the queue.
If the queue was full before, signal space-availability to writers.
This can be used to flush queues in multi-process applications,
when cleanup is required.
-
removeAllElementsForWhich: ablock ifAbsent: exceptionalValue
-
remove all elements from the queue for which checkBlock returns true;
Return the value from exceptionalValue if no such element is in the queue;
otherwise, return the last removed element
-
removeLast
-
return the last value in the queue;
if it its empty, wait until something is put into the receiver.
When the datum has been removed, signal space-availability to writers
accessing-internals
-
readSemaphore
-
return the semaphore which is signalled when data is available
for reading.
-
readWaitWithTimeoutMs: ms
-
Return true if a timeout occurred (i.e. false, if data is available).
-
superNextPut: anObject
-
private; to allow subclasses to call the basic nextPut (w.o. synchronization)
-
superNextPutFirst: anObject
-
private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)
-
synchronizationSemaphore
-
return a synchronization semaphore for myself;
this will be used by synchronized:[...]
-
writeSemaphore
-
return the semaphore which is signalled when the queue has space
for writing.
accessing-reading
-
next
-
return the next value in the queue; if it its empty, wait 'til
something is put into the receiver.
When the datum has been removed, signal space-availability to writers
-
nextIfEmpty: exceptionBlock
-
return the next value in the queue;
if it is empty do not wait, but return the value of exceptionBlock.
When a datum has been removed, signal space-availability to writers
-
nextOrNil
-
return the next value in the queue;
if it is empty do not wait, but return nil.
When a datum has been removed, signal space-availability to writers
-
nextWithTimeout: secondsOrTimeDurationOrNil
-
return the next value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to writers.
Answer nil if a timeout occurs.
The argument may be a time duration or the number of seconds as integer
or float (i.e. use 0.1 for a 100ms timeout).
With zero timeout, this can be used to poll a semaphore (returning
the receiver if the semaphore is available, nil if not).
However, polling is not the intended use of semaphores, though.
If the argument is nil, wait without timeout (forever).
-
peek
-
(comment from inherited method)
return the next value in the queue without removing it.
If the queue is empty, return nil.
accessing-writing
-
nextPut: anObject
-
enter anObject to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Answer anObject
-
nextPutAll: aList
-
enter all the Object in the list to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of n datums to readers.
Note: the whole list is written to the queue if there is space for at least one element.
So the queue may become temporary larger than initially allocated.
Usage example(s):
|q|
q := SharedQueue new:100.
q nextPutAll:(1 to:200).
200 timesRepeat:[q next].
self assert:q isEmpty.
|
-
nextPutFirst: anObject
-
insert anObject at the beginning of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)
-
nonBlockingNextPut: aValue
-
add data to the queue, but do not block if the queue is full
converting
-
asSharedCollection
-
I am already thread safe
-
asSharedCollectionWithLock: aRecursionLock
-
I am already thread safe, but exchange the lock
enumerating
-
do: anObject
-
evaluate the argument, aBlock for each element in the queue
-
reverseDo: anObject
-
evaluate the argument, aBlock for each element in the queue
initialization
-
init: size
-
initialize the receiver for size entries
obsolete synchronized evaluation
-
accessLock
-
marked as obsolete by Stefan Vogel at 22-Mrz-2022
** This is an obsolete interface - do not use it (it may vanish in future versions) **
-
withAccessLockedDo: aBlock
-
marked as obsolete by Stefan Vogel at 20-Jul-2021
** This is an obsolete interface - do not use it (it may vanish in future versions) **
private
-
commonWriteWith: aBlock
-
common code for nextPut / nextPutFirst;
wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
|queues readers writers seqNumber accessLock accessLock2
numbersStillToReceive|
seqNumber := 1.
accessLock := Semaphore forMutualExclusion.
accessLock2 := Semaphore forMutualExclusion.
numbersStillToReceive := BooleanArray new:100000 withAll:true.
queues := (1 to:10) collect:[:i | SharedQueue new].
readers := (1 to:10) collect:[:i |
[ |num|
10000 timesRepeat:[
num := (queues at:i) next.
accessLock2 critical:[
(numbersStillToReceive at:num) ifFalse:[
self halt:(num printString , ' received twice')
] ifTrue:[
numbersStillToReceive at:num put:false.
].
].
'num printCR.'.
].
] fork
].
writers := (1 to:10) collect:[:i |
[ |num|
10000 timesRepeat:[
accessLock critical:[
num := seqNumber.
seqNumber := seqNumber + 1.
].
(queues at:i) nextPut:num.
]
] fork
].
readers do:[:aReader | aReader waitUntilTerminated].
' any left ? '.
(numbersStillToReceive includes:true) ifTrue:[
self halt:'oops - not all numbers received'
]
|
deadlock example:
here, a read process tries to write !
|queue reader writer|
queue := SharedQueue new:10.
reader :=
[
|num|
[ (num := queue next) ~~ #EOF] whileTrue:[
'here is the bad code: writing into the queue !'.
num == 30 ifTrue:[
Transcript showCR:'xxx'.
queue nextPut:'bad1'.
queue nextPut:'bad2'.
].
Transcript showCR:num.
Delay waitForSeconds:0.01.
].
] fork.
writer :=
[ |num|
1 to:60 do:[:seqNr |
queue nextPut:seqNr.
].
queue nextPut:#EOF.
] fork.
reader waitUntilTerminated.
writer waitUntilTerminated.
|
|