Producent-konsument problem
Inom databehandling är producent -konsumentproblemet (även känt som bounded-buffer-problemet ) en familj av problem som beskrivits av Edsger W. Dijkstra sedan 1965.
Dijkstra hittade lösningen på producent-konsumentproblemet när han arbetade som konsult för Electrologica X1 och X8-datorerna: "Den första användningen av producent-konsument var delvis mjukvara, delvis hårdvara: Komponenten som skötte informationstransporten mellan butik och butik. kringutrustning kallades "en kanal" ... Synkronisering styrdes av två räkne semaforer i vad vi nu känner som producent/konsument arrangemang: den ena semaforen som indikerar längden på kön, ökades (i ett V) av CPU och dekrementeras (i ett P) av kanalen, den andra, räknat antalet okvitterade slutföranden, ökades av kanalen och minskades av CPU:n. [Den andra semaforen som är positiv skulle höja motsvarande avbrottsflagga.]"
Dijkstra skrev om det obegränsade buffertfallet: "Vi betraktar två processer, som kallas 'producenten' respektive 'konsumenten'. Producenten är en cyklisk process och varje gång den går igenom sin cykel producerar den en viss del av informationen. som måste bearbetas av konsumenten. Konsumenten är också en cyklisk process och varje gång den går igenom sin cykel kan den bearbeta nästa del av information, som har producerats av producenten ... Vi antar att de två processerna kopplas för detta ändamål via en buffert med obegränsad kapacitet."
Han skrev om det begränsade buffertfallet: "Vi har studerat en producent och en konsument kopplade via en buffert med obegränsad kapacitet ... Relationen blir symmetrisk, om de två kopplas via en buffert av ändlig storlek, säg N portioner "
Och om det multipla producent-konsumentfallet: "Vi överväger ett antal producent/konsumentpar, där par i är kopplat via en informationsström som innehåller n i delar. Vi antar ... den ändliga bufferten som ska innehålla alla delar av alla strömmar att ha en kapacitet på "tot" portioner."
Per Brinch Hansen och Niklaus Wirth såg snart problemet med semaforer: "Jag har kommit till samma slutsats när det gäller semaforer, nämligen att de inte är lämpliga för språk på högre nivå. Istället är de naturliga synkroniseringshändelserna utbyten av meddelanden . "
Dijkstras avgränsade buffertlösning
Den ursprungliga semaforbegränsade buffertlösningen skrevs i ALGOL- stil. Bufferten kan lagra N portioner eller element. "Antal ködelar" -semaforen räknar de fyllda platserna i bufferten, "antal tomma positioner"-semaforen räknar de tomma platserna i bufferten och semaforen "buffertmanipulation" fungerar som mutex för buffertput- och hämta-operationerna. Om bufferten är full, det vill säga antalet tomma positioner är noll, kommer producenttråden att vänta i P(antal tomma positioner) operationen. Om bufferten är tom, det vill säga antalet ködelar är noll, kommer konsumenttråden att vänta i P(antal ködelar) operationen. V()-operationerna släpper semaforerna. Som en bieffekt kan en tråd flyttas från väntekön till klarkön. P()-operationen minskar semaforvärdet ner till noll. V()-operationen ökar semaforvärdet.
0
börja heltal antal ködelar , antal tomma positioner , buffertmanipulation ; _ _ _ _ antal köande delar : = ; antal tomma positioner : = N ; buffertmanipulation : = 1 ; parbegin producent : börja om 1 : producera nästa portion ; P ( antal tomma positioner ) ; _ P ( buffertmanipulation ) ; _ lägg till portion till bufferten ; V ( buffertmanipulation ) ; _ V ( antal köande delar ) ; _ goto igen 1 ände ; konsument : börja om 2 : P ( antal köportioner ) ; _ _ P ( buffertmanipulation ) ; _ ta del från bufferten ; V ( buffertmanipulation ) ; _ V ( antal tomma positioner ) ; _ processdelen tagits ; _ goto igen 2 ände parend slut
Från och med C++ 20 är semaforer en del av språket. Dijkstras lösning kan enkelt skrivas i modern C++. Variabeln buffer_manipulation är en mutex. Semaforfunktionen att förvärva i en tråd och släppa i en annan tråd behövs inte. Lock_guard()-satsen istället för ett lock()- och unlock()-par är C++ RAII . Lock_guard-förstöraren säkerställer låsfrigöring vid undantag. Denna lösning kan hantera flera konsumenttrådar och/eller flera producenttrådar.
0
#inkludera <tråd> #inkludera <mutex> #inkludera <semafor> std :: counting_semafor < N > antal_köportioner { }; std :: counting_semafor < N > antal_tomma_positioner { N }; std :: mutex buffert_manipulation ; void producer () { för (;;) { Portion portion = produce_next_portion (); antal_tomma_positioner . förvärva (); { std :: lock_guard < std :: mutex > g ( buffert_manipulation ); add_portion_to_buffer ( del ); } antal_köportioner . release (); } } void konsument () { för (;;) { number_of_queueing_portions . förvärva (); Portion portion ; { std :: lock_guard < std :: mutex > g ( buffert_manipulation ); portion = take_portion_from_buffer (); } antal_tomma_positioner . release (); process_portion_taken ( del ); } } int main () { std :: tråd t1 ( producent ); std :: tråd t2 ( konsument ); t1 . gå med (); t2 . gå med (); }
Använder bildskärmar
Per Brinch Hansen definierade monitorn: Jag kommer att använda termen monitor för att beteckna en delad variabel och uppsättningen av meningsfulla operationer på den. Syftet med en monitor är att styra schemaläggningen av resurser mellan enskilda processer enligt en viss policy. Tony Hoare lade en teoretisk grund för monitorn.
0
0
0
0
0
0
0 0 0
bounded buffer : monitor start buffer : array .. N - 1 av delen ; huvud , svans : .. N - 1 ; räkna : .. N ; nonempty , nonfull : condition ; procedur tillägg ( x : portion ) ; börja om count = N då icke full . vänta ; notera <= räkna < N ; buffert [ svans ] := x ; svans := svans ( + ) 1 ; räkna := räkna + 1 ; icke-tom . signal slut bifoga ; procedure borttagning ( resultat x : portion ) ; börja om räkna = då icke tom . vänta ; not < count <= N ; x := buffert [ huvud ] ; huvud := huvud ( + ) 1 ; count := count - 1 ; icke full . signal slut ta bort ; huvud := ; svans := ; räkna := ; ändbegränsad buffert ; _
Monitorn är ett objekt som innehåller variabler buffer
, head
, tail
och count
för att realisera en cirkulär buffert , villkorsvariablerna nonempty
och nonfull
för synkronisering och metoderna läggs till
och tar bort
för att komma åt den bounded bufferten. Monitoroperationen väntar motsvarar semaforoperationen P eller förvärv, signal motsvarar V eller släpp. Den inringade operationen (+) tas modulo N. Den presenterade pseudokoden i Pascal-stil visar en Hoare-monitor. En Mesa monitor använder while count
istället för if count
. En C++-version av programmeringsspråket är:
0
0
0
0 0 0
klass Bounded_buffer { Portionsbuffert [ N ] ; // 0..N-1 osignerat huvud , svans ; // 0..N-1 osignerad räkning ; // 0..N std :: condition_variable nonempty , nonfull ; std :: mutex mtx ; public : void append ( Portion x ) { std :: unikt_lås < std :: mutex > lck ( mtx ); icke full . vänta ( lck , [ & ] { return ! ( N == count ); }); hävda ( <= count && count < N ); buffert [ svans ++ ] = x ; svans %= N ; ++ räkna ; icke-tom . notify_one (); } Portion remove () { std :: unikt_lås < std :: mutex > lck ( mtx ); icke-tom . vänta ( lck , [ & ] { return ! ( == count ); }); hävda ( < count && count <= N ); Portion x = buffert [ huvud ++ ]; huvud %= N ; -- räkna ; icke full . notify_one (); returnera x ; } Bounded_buffer () { head = ; svans = ; räkna = ; } };
C++-versionen behöver en extra mutex av tekniska skäl. Den använder assert för att upprätthålla förutsättningarna för att lägga till och ta bort buffertoperationer.
Använder kanaler
Den allra första producent-konsumentlösningen i Electrologicas datorer använde "kanaler". Hoare definierade kanaler : Ett alternativ till explicit namngivning av källa och destination skulle vara att namnge en port genom vilken kommunikation ska ske. Portnamnen skulle vara lokala för processerna, och sättet på vilket par av portar ska kopplas ihop med kanaler skulle kunna deklareras i huvudet på ett parallellkommando. Brinch Hansen implementerade kanaler i programmeringsspråken Joyce och Super Pascal . Plan 9 operativsystemets programmeringsspråk Alef , Inferno operativsystems programmeringsspråk Limbo har kanaler. Följande C-källkod kompileras på Plan 9 från User Space :
0
0
#inkludera "uh" #inkludera "libc.h" #inkludera "thread.h" enum { STACK = 8192 }; void producent ( void * v ) { Channel * ch = v ; for ( uint i = 1 ; ; ++ i ) { sleep ( 400 ); print ( "p %d \n " , i ); sendul ( ch , i ); } } void konsument ( void * v ) { Channel * ch = v ; för (;;) { uint p = recvul ( ch ); print ( " \t\t c %d \n " , p ); sömn ( 200 + nrand ( 600 )); } } void threadmain ( int argc , char ** argv ) { int ( * mk )( void ( * fn )( void * ), void * arg , uint stack ); mk = trådskapa ; Kanal * ch = chancreate ( sizeof ( ulong ), 1 ); mk ( producent , ch , STACK ); mk ( konsument , ch , STAP ); recvp ( chancreate ( sizeof ( void * ), )); threadexitsall ( ); }
Programingångspunkten är vid funktion threadmain
. Funktionsanropet ch = chancreate(sizeof(ulong), 1)
skapar kanalen, funktionsanropet sendul(ch, i)
skickar ett värde till kanalen och funktionsanropet p = recvul(ch)
får ett värde från kanalen. Programmeringsspråket Go har också kanaler. Ett Go-exempel:
0
paketets huvudimport ( "fmt" "math/rand" "tid" ) var sendMsg = func produceMessage ( ) int { time . Sömn ( 400 * tid . Millisekunder ) sendMsg ++ fmt . Printf ( "sendMsg = %v\n" , sendMsg ) return sendMsg } func consumeMessage ( recvMsg int ) { fmt . Printf ( "\t\trecvMsg = %v\n" , recvMsg ) tid . Sömn ( tid . Varaktighet ( 200 + rand . Intn ( 600 )) * tid . Millisekund ) } func main () { ch := make ( chan int , 3 ) go func () { for { ch <- produceMessage () } }() för recvMsg := range ch { consumeMessage ( recvMsg ) } }
Go producent-konsument-lösningen använder huvud Go-rutinen för konsument och skapar en ny, namnlös Go-rutin för producenten. De två Go-rutinerna är kopplade till kanal 2 kap. Denna kanal kan köa upp till tre int-värden. Satsen ch := make(chan int, 3)
skapar kanalen, satsen ch <- produceMessage()
skickar ett värde till kanalen och satsen recvMsg := range ch
tar emot ett värde från kanalen. Tilldelningen av minnesresurser, allokeringen av bearbetningsresurser och synkroniseringen av resurser görs automatiskt av programmeringsspråket.
Utan semaforer eller monitorer
Leslie Lamport dokumenterade en avgränsad buffert producent-konsument lösning för en producent och en konsument: Vi antar att bufferten kan innehålla högst b meddelanden, b >= 1. I vår lösning låter vi k vara en konstant större än b, och låter s och r är heltalsvariabler som antar värden mellan 0 och k-1. Vi antar att initialt s=r och bufferten är tom. Genom att välja k att vara en multipel av b kan bufferten implementeras som en array B [0: b - 1]. Producenten lägger helt enkelt varje nytt meddelande i B[s mod b], och konsumenten tar varje meddelande från B[r mod b]. Algoritmen visas nedan, generaliserad för oändligt k.
0
Producent : L : om ( s - r ) mod k = b sedan gå till L fi ; lägg meddelande i buffert ; s := ( s + 1 ) mod k ; goto L ; Konsument : L : om ( s - r ) mod k = sedan goto L fi ; ta meddelande från buffert ; r := ( r + 1 ) mod k ; goto L ;
Lamport-lösningen använder upptagen väntan i tråden istället för att vänta i schemaläggaren. Denna lösning försummar effekten av schemaläggarens trådomkopplare vid obekväma tider. Om den första tråden har läst ett variabelvärde från minnet, växlar schemaläggaren till den andra tråden som ändrar variabelvärdet, och schemaläggaren växlar tillbaka till den första tråden, då använder den första tråden det gamla värdet på variabeln, inte det nuvarande värdet . Atomic read-modify-write löser detta problem. Modern C++ erbjuder atomära
variabler och operationer för flertrådsprogrammering. Följande upptagna väntande C++11-lösning för en producent och en konsument använder atomic read-modify-write operationer fetch_add
och fetch_sub
på atomic variabel count
.
0
0
0
0
enum { N = 4 }; Meddelandebuffert [ N ] ; std :: atomic < unsigned > count { }; void producer () { unsigned tail { }; för (;;) { Message message = produceMessage (); while ( N == antal ) ; // busy waiting buffer [ tail ++ ] = meddelande ; svans %= N ; räkna . fetch_add ( 1 , std :: memory_order_relaxed ); } } void konsument () { unsigned head { }; för (;;) { while ( == count ) ; // busy waiting Meddelandemeddelande = buffert [ head ++ ] ; huvud %= N ; räkna . fetch_sub ( 1 , std :: memory_order_relaxed ); consumeMessage ( meddelande ); } } int main () { std :: tråd t1 ( producent ); std :: tråd t2 ( konsument ); t1 . gå med (); t2 . gå med (); }
De cirkulära buffertindexvariablerna huvud
och svans
är trådlokala och därför inte relevanta för minneskonsistens. Det variabla antalet
styr producentens och konsumenttrådens upptagna väntan.
Se även
- Atomisk drift
- Design mönster
- FIFO
- Rörledning
- Kanal
- Implementering i Java: Java Message Service
Vidare läsning
- Mark Grand Patterns in Java, Volume 1, A Catalogue of Reusable Design Patterns Illustrated with UML
- C/C++ Users Journal (Dr.Dobb's) Januari 2004, "A C++ Producer-Consumer Concurrency Template Library", av Ted Yuan, är ett färdigt C++ mallbibliotek. Det lilla mallbibliotekets källkod och exempel finns här
- Ioan Tinca, Evolutionen av producent-konsumentproblemet i Java