Producent-konsument problem

Inom databehandling är producent -konsumentproblemet (även känt som bounded-buffer-problemet ) en familj av problem som beskrivits av Edsger W. Dijkstra sedan 1965.

Dijkstra hittade lösningen på producent-konsumentproblemet när han arbetade som konsult för Electrologica X1 och X8-datorerna: "Den första användningen av producent-konsument var delvis mjukvara, delvis hårdvara: Komponenten som skötte informationstransporten mellan butik och butik. kringutrustning kallades "en kanal" ... Synkronisering styrdes av två räkne semaforer i vad vi nu känner som producent/konsument arrangemang: den ena semaforen som indikerar längden på kön, ökades (i ett V) av CPU och dekrementeras (i ett P) av kanalen, den andra, räknat antalet okvitterade slutföranden, ökades av kanalen och minskades av CPU:n. [Den andra semaforen som är positiv skulle höja motsvarande avbrottsflagga.]"

Dijkstra skrev om det obegränsade buffertfallet: "Vi betraktar två processer, som kallas 'producenten' respektive 'konsumenten'. Producenten är en cyklisk process och varje gång den går igenom sin cykel producerar den en viss del av informationen. som måste bearbetas av konsumenten. Konsumenten är också en cyklisk process och varje gång den går igenom sin cykel kan den bearbeta nästa del av information, som har producerats av producenten ... Vi antar att de två processerna kopplas för detta ändamål via en buffert med obegränsad kapacitet."

Han skrev om det begränsade buffertfallet: "Vi har studerat en producent och en konsument kopplade via en buffert med obegränsad kapacitet ... Relationen blir symmetrisk, om de två kopplas via en buffert av ändlig storlek, säg N portioner "

Och om det multipla producent-konsumentfallet: "Vi överväger ett antal producent/konsumentpar, där par i är kopplat via en informationsström som innehåller n i delar. Vi antar ... den ändliga bufferten som ska innehålla alla delar av alla strömmar att ha en kapacitet på "tot" portioner."

Per Brinch Hansen och Niklaus Wirth såg snart problemet med semaforer: "Jag har kommit till samma slutsats när det gäller semaforer, nämligen att de inte är lämpliga för språk på högre nivå. Istället är de naturliga synkroniseringshändelserna utbyten av meddelanden . "

Dijkstras avgränsade buffertlösning

Den ursprungliga semaforbegränsade buffertlösningen skrevs i ALGOL- stil. Bufferten kan lagra N portioner eller element. "Antal ködelar" -semaforen räknar de fyllda platserna i bufferten, "antal tomma positioner"-semaforen räknar de tomma platserna i bufferten och semaforen "buffertmanipulation" fungerar som mutex för buffertput- och hämta-operationerna. Om bufferten är full, det vill säga antalet tomma positioner är noll, kommer producenttråden att vänta i P(antal tomma positioner) operationen. Om bufferten är tom, det vill säga antalet ködelar är noll, kommer konsumenttråden att vänta i P(antal ködelar) operationen. V()-operationerna släpper semaforerna. Som en bieffekt kan en tråd flyttas från väntekön till klarkön. P()-operationen minskar semaforvärdet ner till noll. V()-operationen ökar semaforvärdet.

         
       
          0
          
        
      
       
                  
                          
                        
                          
                        
                              
       
                   
                        
                          
                         
                          
                             
      
 börja  heltal  antal  ködelar  ,  antal  tomma  positioner  ,  buffertmanipulation  ;  _  _  _  _  antal  köande  delar  :  =  ;  antal  tomma  positioner  :  =  N  ;  buffertmanipulation  :  =  1  ;  parbegin  producent  :  börja  om  1  :  producera  nästa  portion  ;  P  (  antal  tomma  positioner  )  ;  _  P  (  buffertmanipulation  )  ;  _  lägg till  portion  till  bufferten  ;  V  (  buffertmanipulation  )  ;  _  V  (  antal  köande  delar  )  ;  _  goto  igen  1  ände  ;  konsument  :  börja  om  2  :  P  (  antal  köportioner  )  ;  _  _  P  (  buffertmanipulation  )  ;  _  ta  del  från  bufferten  ;  V  (  buffertmanipulation  )  ;  _  V  (  antal  tomma  positioner  )  ;  _  processdelen  tagits  ;  _  goto  igen  2  ände  parend  slut 

Från och med C++ 20 är semaforer en del av språket. Dijkstras lösning kan enkelt skrivas i modern C++. Variabeln buffer_manipulation är en mutex. Semaforfunktionen att förvärva i en tråd och släppa i en annan tråd behövs inte. Lock_guard()-satsen istället för ett lock()- och unlock()-par är C++ RAII . Lock_guard-förstöraren säkerställer låsfrigöring vid undantag. Denna lösning kan hantera flera konsumenttrådar och/eller flera producenttrådar.

 
 
 

 0
 
 

  
    
       
    
    
       
      
    
    
  


  
    
    
     
    
       
        
    
    
    
  


  
   
   
  
  
 #inkludera  <tråd>  #inkludera  <mutex>  #inkludera  <semafor>  std  ::  counting_semafor  <  N  >  antal_köportioner  {  };  std  ::  counting_semafor  <  N  >  antal_tomma_positioner  {  N  };  std  ::  mutex  buffert_manipulation  ;  void  producer  ()  {  för  (;;)  {  Portion  portion  =  produce_next_portion  ();  antal_tomma_positioner  .  förvärva  ();  {  std  ::  lock_guard  <  std  ::  mutex  >  g  (  buffert_manipulation  );  add_portion_to_buffer  (  del  );  }  antal_köportioner  .  release  ();  }  }  void  konsument  ()  {  för  (;;)  {  number_of_queueing_portions  .  förvärva  ();  Portion  portion  ;  {  std  ::  lock_guard  <  std  ::  mutex  >  g  (  buffert_manipulation  );  portion  =  take_portion_from_buffer  ();  }  antal_tomma_positioner  .  release  ();  process_portion_taken  (  del  );  }  }  int  main  ()  {  std  ::  tråd  t1  (  producent  );  std  ::  tråd  t2  (  konsument  );  t1  .  gå med  ();  t2  .  gå med  ();  } 

Använder bildskärmar

Per Brinch Hansen definierade monitorn: Jag kommer att använda termen monitor för att beteckna en delad variabel och uppsättningen av meningsfulla operationer på den. Syftet med en monitor är att styra schemaläggningen av resurser mellan enskilda processer enligt en viss policy. Tony Hoare lade en teoretisk grund för monitorn.

  
    0  
      0
     0
      
    
          
       0    
        
          
          
      
     
      
        0  
       0    
        
          
          
      
     
    0   0   0
  bounded  buffer  :  monitor  start  buffer  :  array  ..  N  -  1  av  delen  ;  huvud  ,  svans  :  ..  N  -  1  ;  räkna  :  ..  N  ;  nonempty  ,  nonfull  :  condition  ;  procedur  tillägg  (  x  :  portion  )  ;  börja  om  count  =  N  icke full  .  vänta  ;  notera  <=  räkna  <  N  ;  buffert  [  svans  ]  :=  x  ;  svans  :=  svans  (  +  )  1  ;  räkna  :=  räkna  +  1  ;  icke-tom  .  signal  slut  bifoga  ;  procedure  borttagning  (  resultat  x  :  portion  )  ;  börja  om  räkna  =  icke tom  .  vänta  ;  not  <  count  <=  N  ;  x  :=  buffert  [  huvud  ]  ;  huvud  :=  huvud  (  +  )  1  ;  count  :=  count  -  1  ;  icke full  .  signal  slut  ta bort  ;  huvud  :=  ;  svans  :=  ;  räkna  :=  ;  ändbegränsad  buffert  ;  _  

Monitorn är ett objekt som innehåller variabler buffer , head , tail och count för att realisera en cirkulär buffert , villkorsvariablerna nonempty och nonfull för synkronisering och metoderna läggs till och tar bort för att komma åt den bounded bufferten. Monitoroperationen väntar motsvarar semaforoperationen P eller förvärv, signal motsvarar V eller släpp. Den inringade operationen (+) tas modulo N. Den presenterade pseudokoden i Pascal-stil visar en Hoare-monitor. En Mesa monitor använder while count istället för if count . En C++-version av programmeringsspråket är:

  
       
      
          
    
   

     
     
          
    0      
      
      
    
    
  
    
     
       0   
    0      
       
       
    
    
     
  
   
      0   0   0
  
 klass  Bounded_buffer  {  Portionsbuffert  [  N  ]  ;  // 0..N-1  osignerat  huvud  ,  svans  ;  // 0..N-1  osignerad  räkning  ;  // 0..N  std  ::  condition_variable  nonempty  ,  nonfull  ;  std  ::  mutex  mtx  ;  public  :  void  append  (  Portion  x  )  {  std  ::  unikt_lås  <  std  ::  mutex  >  lck  (  mtx  );  icke full  .  vänta  (  lck  ,  [  &  ] {  return  !  (  N  ==  count  );  });  hävda  (  <=  count  &&  count  <  N  );  buffert  [  svans  ++  ]  =  x  ;  svans  %=  N  ;  ++  räkna  ;  icke-tom  .  notify_one  ();  }  Portion  remove  ()  {  std  ::  unikt_lås  <  std  ::  mutex  >  lck  (  mtx  );  icke-tom  .  vänta  (  lck  ,  [  &  ] {  return  !  (  ==  count  );  });  hävda  (  <  count  &&  count  <=  N  );  Portion  x  =  buffert  [  huvud  ++  ];  huvud  %=  N  ;  --  räkna  ;  icke full  .  notify_one  ();  returnera  x  ;  }  Bounded_buffer  ()  {  head  =  ;  svans  =  ;  räkna  =  ;  }  }; 

C++-versionen behöver en extra mutex av tekniska skäl. Den använder assert för att upprätthålla förutsättningarna för att lägga till och ta bort buffertoperationer.

Använder kanaler

Den allra första producent-konsumentlösningen i Electrologicas datorer använde "kanaler". Hoare definierade kanaler : Ett alternativ till explicit namngivning av källa och destination skulle vara att namnge en port genom vilken kommunikation ska ske. Portnamnen skulle vara lokala för processerna, och sättet på vilket par av portar ska kopplas ihop med kanaler skulle kunna deklareras i huvudet på ett parallellkommando. Brinch Hansen implementerade kanaler i programmeringsspråken Joyce och Super Pascal . Plan 9 operativsystemets programmeringsspråk Alef , Inferno operativsystems programmeringsspråk Limbo har kanaler. Följande C-källkod kompileras på Plan 9 från User Space :

 
 
 

     

   
     
         
    
     
     
  

   
     
    
       
     
      
  

     
        
    
      
    
    
   0
  0
 #inkludera  "uh"  #inkludera  "libc.h"  #inkludera  "thread.h"  enum  {  STACK  =  8192  };  void  producent  (  void  *  v  )  {  Channel  *  ch  =  v  ;  for  (  uint  i  =  1  ;  ;  ++  i  )  {  sleep  (  400  );  print  (  "p %d  \n  "  ,  i  );  sendul  (  ch  ,  i  );  }  }  void  konsument  (  void  *  v  )  {  Channel  *  ch  =  v  ;  för  (;;)  {  uint  p  =  recvul  (  ch  );  print  (  "  \t\t  c %d  \n  "  ,  p  );  sömn  (  200  +  nrand  (  600  ));  }  }  void  threadmain  (  int  argc  ,  char  **  argv  )  {  int  (  *  mk  )(  void  (  *  fn  )(  void  *  ),  void  *  arg  ,  uint  stack  );  mk  =  trådskapa  ;  Kanal  *  ch  =  chancreate  (  sizeof  (  ulong  ),  1  );  mk  (  producent  ,  ch  ,  STACK  );  mk  (  konsument  ,  ch  ,  STAP  );  recvp  (  chancreate  (  sizeof  (  void  *  ),  ));  threadexitsall  (  );  } 

Programingångspunkten är vid funktion threadmain . Funktionsanropet ch = chancreate(sizeof(ulong), 1) skapar kanalen, funktionsanropet sendul(ch, i) skickar ett värde till kanalen och funktionsanropet p = recvul(ch) får ett värde från kanalen. Programmeringsspråket Go har också kanaler. Ett Go-exempel:

 

 
	
	
	


   0

   
	  
	
	 
	 

   
	 
	  

  
	    
	  
		 
			  
		
	
	     
		
	
 paketets  huvudimport  (  "fmt"  "math/rand"  "tid"  )  var  sendMsg  =  func  produceMessage  (  )  int  {  time  .  Sömn  (  400  *  tid  .  Millisekunder  )  sendMsg  ++  fmt  .  Printf  (  "sendMsg = %v\n"  ,  sendMsg  )  return  sendMsg  }  func  consumeMessage  (  recvMsg  int  )  {  fmt  .  Printf  (  "\t\trecvMsg = %v\n"  ,  recvMsg  )  tid  .  Sömn  (  tid  .  Varaktighet  (  200  +  rand  .  Intn  (  600  ))  *  tid  .  Millisekund  )  }  func  main  ()  {  ch  :=  make  (  chan  int  ,  3  )  go  func  ()  {  for  {  ch  <-  produceMessage  ()  }  }()  för  recvMsg  :=  range  ch  {  consumeMessage  (  recvMsg  )  }  } 

Go producent-konsument-lösningen använder huvud Go-rutinen för konsument och skapar en ny, namnlös Go-rutin för producenten. De två Go-rutinerna är kopplade till kanal 2 kap. Denna kanal kan köa upp till tre int-värden. Satsen ch := make(chan int, 3) skapar kanalen, satsen ch <- produceMessage() skickar ett värde till kanalen och satsen recvMsg := range ch tar emot ett värde från kanalen. Tilldelningen av minnesresurser, allokeringen av bearbetningsresurser och synkroniseringen av resurser görs automatiskt av programmeringsspråket.

Utan semaforer eller monitorer

Leslie Lamport dokumenterade en avgränsad buffert producent-konsument lösning för en producent och en konsument: Vi antar att bufferten kan innehålla högst b meddelanden, b >= 1. I vår lösning låter vi k vara en konstant större än b, och låter s och r är heltalsvariabler som antar värden mellan 0 och k-1. Vi antar att initialt s=r och bufferten är tom. Genom att välja k att vara en multipel av b kan bufferten implementeras som en array B [0: b - 1]. Producenten lägger helt enkelt varje nytt meddelande i B[s mod b], och konsumenten tar varje meddelande från B[r mod b]. Algoritmen visas nedan, generaliserad för oändligt k.


               
         
            
       

           0    
         
            
        Producent  :  L  :  om  (  s  -  r  )  mod  k  =  b  sedan  gå till  L  fi  ;  lägg  meddelande  i  buffert  ;  s  :=  (  s  +  1  )  mod  k  ;  goto  L  ;  Konsument  :  L  :  om  (  s  -  r  )  mod  k  =  sedan  goto  L  fi  ;  ta  meddelande  från  buffert  ;  r  :=  (  r  +  1  )  mod  k  ;  goto  L  ; 

Lamport-lösningen använder upptagen väntan i tråden istället för att vänta i schemaläggaren. Denna lösning försummar effekten av schemaläggarens trådomkopplare vid obekväma tider. Om den första tråden har läst ett variabelvärde från minnet, växlar schemaläggaren till den andra tråden som ändrar variabelvärdet, och schemaläggaren växlar tillbaka till den första tråden, då använder den första tråden det gamla värdet på variabeln, inte det nuvarande värdet . Atomic read-modify-write löser detta problem. Modern C++ erbjuder atomära variabler och operationer för flertrådsprogrammering. Följande upptagna väntande C++11-lösning för en producent och en konsument använder atomic read-modify-write operationer fetch_add och fetch_sub på atomic variabel count .

    
 
  0
  
    0
    
       
        
       
      
      
     
  

  
    0
    
     0   
       
       
      
     
    
  

  
   
   
  
  
 enum  {  N  =  4  };  Meddelandebuffert  [  N  ]  ;  std  ::  atomic  <  unsigned  >  count  {  };  void  producer  ()  {  unsigned  tail  {  };  för  (;;)  {  Message  message  =  produceMessage  ();  while  (  N  ==  antal  )  ;  // busy waiting  buffer  [  tail  ++  ]  =  meddelande  ;  svans  %=  N  ;  räkna  .  fetch_add  (  1  ,  std  ::  memory_order_relaxed  );  }  }  void  konsument  ()  {  unsigned  head  {  };  för  (;;)  {  while  (  ==  count  )  ;  // busy waiting  Meddelandemeddelande  =  buffert  [  head  ++  ]  ;  huvud  %=  N  ;  räkna  .  fetch_sub  (  1  ,  std  ::  memory_order_relaxed  );  consumeMessage  (  meddelande  );  }  }  int  main  ()  {  std  ::  tråd  t1  (  producent  );  std  ::  tråd  t2  (  konsument  );  t1  .  gå med  ();  t2  .  gå med  ();  } 

De cirkulära buffertindexvariablerna huvud och svans är trådlokala och därför inte relevanta för minneskonsistens. Det variabla antalet styr producentens och konsumenttrådens upptagna väntan.

Se även

Vidare läsning