eXept Software AG Logo

Smalltalk/X Webserver

Documentation of class 'SharedQueue':

Home

Documentation
www.exept.de
Everywhere
for:
[back]

Class: SharedQueue


Inheritance:

   Object
   |
   +--Collection
      |
      +--Queue
         |
         +--SharedQueue
            |
            +--UnlimitedSharedQueue

Package:
stx:libbasic2
Category:
Kernel-Processes
Version:
rev: 1.74 date: 2024/01/22 11:13:58
user: stefan
file: SharedQueue.st directory: libbasic2
module: stx stc-classLibrary: libbasic2

Description:


SharedQueues provide a safe mechanism for processes to communicate.
They are basically Queues, with added secure access to the internals,
allowing use from multiple processes (i.e. the access methods use
critical regions to protect against confusion due to a process
switch within a modification).

Also, sharedQueues can be used for synchronization, since a reading
process will be blocked when attempting to read an empty queue, while
a writer will be blocked when attempting to write into a full queue.
For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.

Be warned:
    if a reader process wants to add elements to the sharedqueue in its
    read-loop, it may block, if the queue is full, leading to a deadlock.
    The reason is that the sharedQueues size is fixed, and any write is blocked
    if the queue is full.
    For this situations, please use an UnlimitedSharedQueue, which grows in that
    particular situation.
    
See samples in doc/coding.

copyright

COPYRIGHT (c) 1993 by Claus Gittinger All Rights Reserved This software is furnished under a license and may be used only in accordance with the terms of that license and with the inclusion of the above copyright notice. This software may not be provided or otherwise made available to, or used by, any other person. No title to or ownership of the software is hereby transferred.

Instance protocol:

accessing
o  removeAll
remove all elements from the queue;
do not wait, but synchronize access to the queue.
If the queue was full before, signal space-availability to writers.
This can be used to flush queues in multi-process applications,
when cleanup is required.

o  removeAllElementsForWhich: ablock ifAbsent: exceptionalValue
remove all elements from the queue for which checkBlock returns true;
Return the value from exceptionalValue if no such element is in the queue;
otherwise, return the last removed element

o  removeLast
return the last value in the queue;
if it its empty, wait until something is put into the receiver.
When the datum has been removed, signal space-availability to writers

accessing-internals
o  readSemaphore
return the semaphore which is signalled when data is available
for reading.

o  readWaitWithTimeoutMs: ms
Return true if a timeout occurred (i.e. false, if data is available).

o  superNextPut: anObject
private; to allow subclasses to call the basic nextPut (w.o. synchronization)

o  superNextPutFirst: anObject
private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)

o  synchronizationSemaphore
return a synchronization semaphore for myself;
this will be used by synchronized:[...]

o  writeSemaphore
return the semaphore which is signalled when the queue has space
for writing.

accessing-reading
o  next
return the next value in the queue; if it its empty, wait 'til
something is put into the receiver.
When the datum has been removed, signal space-availability to writers

o  nextIfEmpty: exceptionBlock
return the next value in the queue;
if it is empty do not wait, but return the value of exceptionBlock.
When a datum has been removed, signal space-availability to writers

o  nextOrNil
return the next value in the queue;
if it is empty do not wait, but return nil.
When a datum has been removed, signal space-availability to writers

o  nextWithTimeout: secondsOrTimeDurationOrNil
return the next value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to writers.
Answer nil if a timeout occurs.

The argument may be a time duration or the number of seconds as integer
or float (i.e. use 0.1 for a 100ms timeout).
With zero timeout, this can be used to poll a semaphore (returning
the receiver if the semaphore is available, nil if not).
However, polling is not the intended use of semaphores, though.
If the argument is nil, wait without timeout (forever).

o  peek
(comment from inherited method)
return the next value in the queue without removing it.
If the queue is empty, return nil.

accessing-writing
o  nextPut: anObject
enter anObject to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Answer anObject

o  nextPutAll: aList
enter all the Object in the list to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of n datums to readers.
Note: the whole list is written to the queue if there is space for at least one element.
So the queue may become temporary larger than initially allocated.

Usage example(s):

        |q|

        q := SharedQueue new:100.
        q nextPutAll:(1 to:200).
        200 timesRepeat:[q next].
        self assert:q isEmpty.

o  nextPutFirst: anObject
insert anObject at the beginning of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)

o  nonBlockingNextPut: aValue
add data to the queue, but do not block if the queue is full

converting
o  asSharedCollection
I am already thread safe

o  asSharedCollectionWithLock: aRecursionLock
I am already thread safe, but exchange the lock

enumerating
o  do: anObject
evaluate the argument, aBlock for each element in the queue

o  reverseDo: anObject
evaluate the argument, aBlock for each element in the queue

initialization
o  init: size
initialize the receiver for size entries

obsolete synchronized evaluation
o  accessLock
marked as obsolete by Stefan Vogel at 22-Mrz-2022

** This is an obsolete interface - do not use it (it may vanish in future versions) **

o  withAccessLockedDo: aBlock
marked as obsolete by Stefan Vogel at 20-Jul-2021

** This is an obsolete interface - do not use it (it may vanish in future versions) **

private
o  commonWriteWith: aBlock
common code for nextPut / nextPutFirst;
wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.


Examples:


    |queues readers writers seqNumber accessLock accessLock2
     numbersStillToReceive|

    seqNumber := 1.
    accessLock := Semaphore forMutualExclusion.
    accessLock2 := Semaphore forMutualExclusion.

    numbersStillToReceive := BooleanArray new:100000 withAll:true.

    queues := (1 to:10) collect:[:i | SharedQueue new].
    readers := (1 to:10) collect:[:i |
                                    [   |num|
                                        10000 timesRepeat:[
                                            num := (queues at:i) next.
                                            accessLock2 critical:[
                                                (numbersStillToReceive at:num) ifFalse:[
                                                    self halt:(num printString , ' received twice')
                                                ] ifTrue:[
                                                    numbersStillToReceive at:num put:false.
                                                ].
                                            ].
                                            'num printCR.'.
                                        ].
                                    ] fork
                                 ].
    writers := (1 to:10) collect:[:i |
                                    [   |num|

                                        10000 timesRepeat:[
                                            accessLock critical:[
                                                num := seqNumber.
                                                seqNumber := seqNumber + 1.
                                            ].
                                            (queues at:i) nextPut:num.
                                        ]
                                    ] fork
                                 ].

    readers do:[:aReader | aReader waitUntilTerminated].

    ' any left ? '.
    (numbersStillToReceive includes:true) ifTrue:[
        self halt:'oops - not all numbers received'
    ]
deadlock example: here, a read process tries to write !
    |queue reader writer|

    queue := SharedQueue new:10.
    reader := 
        [   
            |num|
            
            [ (num := queue next) ~~ #EOF] whileTrue:[
                'here is the bad code: writing into the queue !'.
                num == 30 ifTrue:[
                    Transcript showCR:'xxx'.
                    queue nextPut:'bad1'.
                    queue nextPut:'bad2'.
                ].
                Transcript showCR:num.
                Delay waitForSeconds:0.01.
            ].
        ] fork.

    writer := 
        [   |num|

            1 to:60 do:[:seqNr |
                queue nextPut:seqNr.
            ].
            queue nextPut:#EOF.
        ] fork.

    reader waitUntilTerminated.
    writer waitUntilTerminated.


ST/X 7.7.0.0; WebServer 1.702 at 20f6060372b9.unknown:8081; Wed, 22 Jan 2025 07:57:44 GMT