eXept Software AG Logo

Smalltalk/X Webserver

Documentation of class 'SharedQueue':

Home

Documentation
www.exept.de
Everywhere
for:
[back]

Class: SharedQueue


Inheritance:

   Object
   |
   +--Collection
      |
      +--Queue
         |
         +--SharedQueue
            |
            +--UnlimitedSharedQueue

Package:
stx:libbasic2
Category:
Kernel-Processes
Version:
rev: 1.54 date: 2019/06/25 12:28:51
user: cg
file: SharedQueue.st directory: libbasic2
module: stx stc-classLibrary: libbasic2
Author:
Claus Gittinger

Description:


SharedQueues provide a safe mechanism for processes to communicate.
They are basically Queues, with added secure access to the internals,
allowing use from multiple processes (i.e. the access methods use
critical regions to protect against confusion due to a process
switch within a modification).

Also, sharedQueues can be used for synchronization, since a reading
process will be blocked when attempting to read an empty queue, while
a writer will be blocked when attempting to write into a full queue.
For nonBlocking read, use #isEmpty; for nonBlocking write, use #isFull.

Be warned:
    if a reader process wants to add elements to the sharedqueue in its
    read-loop, it may block, if the queue is full, leading to a deadlock.
    The reason is that the sharedQueues size is fixed, and any write is blocked
    if the queue is full.
    For this situations, please use an UnlimitedSharedQueue, which grows in that
    particular situation.
    
See samples in doc/coding.


Related information:

    SharedCollection
    UnlimitedSharedQueue
    Queue
    Semaphore
    Process
    CodingExamples::SharedQueueExamples

Instance protocol:

accessing
o  remove: anElement ifAbsent: exceptionalValue
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue

o  removeAll
remove all elements in the queue; do not wait, but
synchronize access to the queue.
If the queue was full before, signal space-availability to writers.
This can be used to flush queues in multi-process applications,
when cleanup is required.

o  removeIdentical: anElement ifAbsent: exceptionalValue
(comment from inherited method)
remove and return a particular element from the queue;
Return the value from exceptionalValue if the element is not in the queue

o  removeLast
return the last value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to
writers

accessing-internals
o  accessLock
return the critical access-semaphore which is used internally to synchronize access

o  readSemaphore
return the semaphore which is signalled when data is available
for reading.

o  readWaitWithTimeoutMs: ms
Return true if a timeout occurred (i.e. false, if data is available).

o  superNextPut: anObject
private; to allow subclasses to call the basic nextPut (w.o. synchronization)

o  superNextPutFirst: anObject
private; to allow subclasses to call the basic nextPutFirst (w.o. synchronization)

o  withAccessLockedDo: aBlock
evaluate aBlock while access via next/nextPut are blocked.

o  writeSemaphore
return the semaphore which is signalled when the queue has space
for writing.

accessing-reading
o  next
return the next value in the queue; if it its empty, wait 'til
something is put into the receiver.
When the datum has been removed, signal space-availability to writers

o  nextIfEmpty: exceptionBlock
return the next value in the queue;
if it is empty do not wait, but return the value of exceptionBlock.
When a datum has been removed, signal space-availability to writers

o  nextOrNil
return the next value in the queue;
if it is empty do not wait, but return nil.
When a datum has been removed, signal space-availability to writers

o  nextWithTimeout: secondsOrTimeDurationOrNil
return the next value in the queue; if it its empty, wait until
something is put into the receiver.
When the datum has been removed, signal space-availability to writers.
Answer nil if a timeout occurs.

The argument may be a time duration or the number of seconds as integer
or float (i.e. use 0.1 for a 100ms timeout).
With zero timeout, this can be used to poll a semaphore (returning
the receiver if the semaphore is available, nil if not).
However, polling is not the intended use of semaphores, though.
If the argument is nil, wait without timeout (forever).

o  peek

accessing-writing
o  nextPut: anObject
enter anObject to the end of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.

o  nextPutFirst: anObject
insert anObject at the beginning of the queue;
Wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.
Insertion at the beginning may be useful to add hi-prio elements (for example, in a job-scheduler)

enumerating
o  do: anObject
evaluate the argument, aBlock for each element in the queue

o  reverseDo: anObject
evaluate the argument, aBlock for each element in the queue

initialization
o  init: size
initialize the receiver for size entries

private
o  commonWriteWith: aBlock
common code for nextPut / nextPutFirst;
wait for available space, if the queue is full.
After the put, signal availablity of a datum to readers.


Examples:


    |queues readers writers seqNumber accessLock accessLock2
     numbersStillToReceive|

    seqNumber := 1.
    accessLock := Semaphore forMutualExclusion.
    accessLock2 := Semaphore forMutualExclusion.

    numbersStillToReceive := BooleanArray new:100000 withAll:true.

    queues := (1 to:10) collect:[:i | SharedQueue new].
    readers := (1 to:10) collect:[:i |
                                    [   |num|
                                        10000 timesRepeat:[
                                            num := (queues at:i) next.
                                            accessLock2 critical:[
                                                (numbersStillToReceive at:num) ifFalse:[
                                                    self halt:(num printString , ' received twice')
                                                ] ifTrue:[
                                                    numbersStillToReceive at:num put:false.
                                                ].
                                            ].
                                            'num printCR.'.
                                        ].
                                    ] fork
                                 ].
    writers := (1 to:10) collect:[:i |
                                    [   |num|

                                        10000 timesRepeat:[
                                            accessLock critical:[
                                                num := seqNumber.
                                                seqNumber := seqNumber + 1.
                                            ].
                                            (queues at:i) nextPut:num.
                                        ]
                                    ] fork
                                 ].

    readers do:[:aReader | aReader waitUntilTerminated].

    ' any left ? '.
    (numbersStillToReceive includes:true) ifTrue:[
        self halt:'oops - not all numbers received'
    ]
deadlock example: here, a read process tries to write !
    |queue reader writer|

    queue := SharedQueue new:10.
    reader := 
        [   
            |num|
            
            [ (num := queue next) ~~ #EOF] whileTrue:[
                'here is the bad code: writing into the queue !'.
                num == 30 ifTrue:[
                    Transcript showCR:'xxx'.
                    queue nextPut:'bad1'.
                    queue nextPut:'bad2'.
                ].
                Transcript showCR:num.
                Delay waitForSeconds:0.01.
            ].
        ] fork.

    writer := 
        [   |num|

            1 to:60 do:[:seqNr |
                queue nextPut:seqNr.
            ].
            queue nextPut:#EOF.
        ] fork.

    reader waitUntilTerminated.
    writer waitUntilTerminated.


ST/X 7.2.0.0; WebServer 1.670 at bd0aa1f87cdd.unknown:8081; Fri, 07 Oct 2022 14:54:23 GMT