Tek-Tips is the largest IT community on the Internet today!

Members share and learn making Tek-Tips Forums the best source of peer-reviewed technical information on the Internet!

  • Congratulations strongm on being selected by the Tek-Tips community for having the most helpful posts in the forums last week. Way to Go!

Sample Python multiprocessing.connection server

Status
Not open for further replies.

jbrearley

Programmer
Nov 26, 2012
9
CA
For those of you learning multiprocessing and multiprocessing.connection modules, here is a sample server that launches parallel children processes, echoes messages and will stress the CPUs on demand.

BTW, you MUST use the multiprocessing.connection.Client to connect with the server. Raw sockets cause MemoryError, see:
Python:
# Written by John Brearley,  Jan 2013

# 2013/1/10 submitted bug to: [URL unfurl="true"]http://bugs.python.org/issue16920[/URL]
# Answer: MUST use multiprocessing client! It has msg length prefix in data.

import multiprocessing,multiprocessing.connection,re,sys,threading,time

rc_shutdown=3 ;# return code to signal server shutdown
srv_ctl=1 ;# flag to control while loops

# Routine handles new incoming sockets as a separate process.
def handle_socket(l,q):
   name=multiprocessing.current_process().name
   print("handle_socket name:",name,"waiting for socket")
   # print("handle_socket l:",l,"q:",q)
   # print_data.print_data(l)  ;# for learning purposes
   # print_data.print_data(q)

   # code blocks here waiting for incoming socket.
   conn=l.accept() 
   # print("handle_socket name:",name,"conn:",conn)
   # print_data.print_data(conn) ;# for learning purposes
   # print("handle_socket name:",name,"fileno:",conn.fileno())
   # How to get getsockname() getpeername() ?

   # Tell the server to launch another child process.
   # We dont do it here, as it will become a child of this
   # process, and wont be visible to the server as an independant
   # process. Sub-children process seem to be hidden from the parent.
   q.put("launch") ;# tells the server to launch another process

   # Here conn.poll() does work. However, since this is a server, its job is
   # to sit and wait, so blocking mode is fine. There may be cases where you
   # want to detect the remote client going to sleep, being slow, so you can
   # implement timeout loops using conn.poll(x)
   # rc=conn.poll(0.5)
   # print("handle_socket poll rc:",rc)

   # New incoming socket is in blocking mode, no supported option for non-blocking.
   print("handle_socket name:",name,"waiting for data")
   rc=0
   while (1):
      # Wait for data
      try:
         data=conn.recv()
      except:
         msg=sys.exc_info()[1]
         msg=str(msg)
         if (msg == ""):
            print("handle_socket recv on name:",name,"null msg ==> socket closing!","msg:",msg)
         else:
            print("handle_socket recv on name:",name,"ERROR: ==> socket closing!","msg:",msg)
            rc=1
         break

      # Did we get any data?
      dr=len(data) ;# received data length
      if (dr == 0):
         print("handle_socket recv on name:",name,"no data ==> socket closing!")
         break

      # Send back the received data
      print("handle_socket send on name:",name,"data:",data)
      conn.send(data) ;# Does NOT return data length like base socket does

      # Check for commands in received data
      # NB: str chokes on null data!!!
      # str() will be covered by dr==0 check above.
      try:
         temp=str(data,"ASCII") 
      except:
         print("handle_socket name:",name,"ERROR:",sys.exc_info()[1])
         temp=""
      # print("handle_socket name:",name,"temp:",temp)

      # Look for stress command, with optional number of seconds.
      # Drive CPU to 100% useage. Helps prove that tasks are distributed across available CPU.
      m=re.search("stress\s*(\d*)",temp,re.I)
      if (m):
         try:
            delay=int(m.group(1)) ;# use optional number from stress command
         except:
            delay=10
         print("handle_socket creating CPU stress for:",delay,"seconds, cmd from:",name)
         start_sec=time.time()
         temp=0
         while (temp < delay):
            temp=time.time()-start_sec
         print("handle_socket done CPU stress for:",delay,"seconds, cmd from:",name)

      # Look for shutdown command.
      if (temp == "shutdown"):
         print("handle_socket Shutting down server, cmd from:",name)
         rc=rc_shutdown ;# Main server loop looks at exitcode of child process
         break

   # Close socket, give appropriate return code.
   conn.close()
   exit(rc)

def monitor_stdin():
   global srv_ctl

   # Main process shows sys.stdin is connected to stdin. BUT, a child
   # process shows null! A thread also has access to stdin.
   # print_data.print_data(sys.stdin)

   # Check keyboard/stdin on terminal window for shutdown command.
   # Done as a separate process as I cant find a way to make stdin
   # non-blocking. On Windows, select only works on sockets, not stdin.
   print("monitor_stdin looking for keyboard input")
   while (srv_ctl):
      line=input() ;# blocking call
      # line=sys.stdin.readline() ;# blocking call
      line=line.strip()
      if (line == ""):
         # Most of the time there will be no stdin data available.
         pass
      elif (line == "shutdown"):
         print("monitor_stdin shutting down server, cmd from keyboard")
         srv_ctl=0 ;# Stops main server loop and this local loop
         break
      else:
         print("monitor_stdin WARNING: ignoring:",line)
         print("monitor_stdin To shutdown the server process, type: shutdown")
      time.sleep(1)
   exit()

# Main program
if (__name__ == "__main__"):

   # Create a new multiprocessing connection listener object
   port=10500
   host="" ;# ' means use all addresses available
   l=multiprocessing.connection.Listener((host,port))
   # print_data.print_data(l) ;# for learning purposes
   # Listener has no methods listen, setblocking or fileno
   print("\nmain server listening on host",host,"port",port)

   # Code is designed around the fact that the listener object is blocking only!
   # method poll() is for Client only, not Listener! 
   # rc=s.poll(0.5) ;# test code, fails!
   # print("main poll rc:",rc)

   # The issue with the first child process launching another child process is that
   # the first child process will not terminate untill all its children process have
   # also terminated. Also, the sub-children process do not appear to be visible to
   # the main process. To avoid these issues, we ensure that the main process always 
   # launches the child process. 

   # Since we only want one child process doing an accept() at any given time, we need
   # the child process to be able to signal the server process when its time to launch
   # another process. This is done using the multiprocessing queue.
   q=multiprocessing.Queue()
   # print_data.print_data(q) ;# for learning purposes
   q.put("launch") ;# tells the server to launch the first process
   time.sleep(0.2)
   # print("main qsize:",q.qsize(),"empty:",q.empty())

   # Keep track of all children launched. While the method multiprocessing.active_children()
   # will give us the currently active children, it wont tell us about children that just terminated.
   # So if we want to check on the exitcode of a recently terminated child, we need to have our
   # own child list that we can go look at. 
   child_list=[]

   # NB: We could use the queue pass back a shutdown command. However, I chose to use
   # child process return codes to signal the shutdown command.

   # We also watch the terminal window keyboard/stdin for the shutdown command.
   # This is done as a seperate thread, as I cant find a non-blocking way to read
   # stdin on Windows. A thread has access to stdin, but a child process does not.
   t=threading.Thread(target=monitor_stdin);# create new process
   # Keep thread object separate, as thread has no exitcode to monitor or terminate method
   t.start() ;# dont use obj.run(), run forces serial mode, not parallel!
   print("main created t.ident:",t.ident,"t.name:",t.name)

   # Main server loop
   while (srv_ctl):

      # Get next item from queue. By default, q.get will wait forever if no work is available!
      # We add a 3 sec timeout to avoid lockup!
      try:
         task=q.get(True,3)
      except:
         task="<Empty>"
      # print("main q task:",task)

      # If necessary, launch a handle_socket with access to listener & queue objects.
      if (task == "launch"):
         # NB: args=(...,) must have at least one comma! Even if only 1 arg!
         obj=multiprocessing.Process(target=handle_socket, args=(l,q));# create new process
         # print("main created process",obj)
         child_list.append(obj) ;# save the child object
         obj.start() ;# dont use obj.run(), run forces serial mode, not parallel!
         # child pid is stored in object as ident & pid, wont have usefull value until process actually starts
         print("main created obj.pid:",obj.pid,"obj.name:",obj.name)

      # Poll existing children processes.
      # print("main child_list:",child_list)
      if (len(child_list) == 0):
         # print("main no active children")
         pass
      else:
         for i,child in enumerate(child_list):
            # print("main polling i:",i,"child:",child)
            if (not child.is_alive()):
               # child process has finished, get exitcode
               rc=child.exitcode
               print("main child:",child.name,"done, rc:",rc)
               if (rc != None):
                  # Forget this child, we have now got its exitcode
                  del child_list[i]
                  # Child process return code can signal shutdown for server.
                  if (rc == rc_shutdown):
                     print("main shutting down server, cmd from:",child.name)
                     srv_ctl=0 ;# stops main while loop & monitor_stdin while loop

   # Shutdown server, terminate any remaining children processes.
   l.close()
   for child in child_list:
      if (child.is_alive()):
         child.terminate()
         time.sleep(0.2)
         print("main child:",child.name,"terminated rc:",child.exitcode)

   # End main program. Prompt user, wait for thread to terminate.
   if (t.is_alive()):
      print("\nmain hit return to end the keyboard monitor thread.")
      while (t.is_alive()):
         time.sleep(1)
   print("\nmain all done.")
 
Here is the corresponding client code:

Python:
# Updated by John Brearley,  Jan 2013

# multiprocessing.connection client program

# 2013/1/13 poll bug submitted [URL unfurl="true"]http://bugs.python.org/issue16955[/URL]

import multiprocessing.connection,re,sys

# Get command line parameters, if any, for desired host & port.
host="localhost" ;# default host
argc=len(sys.argv)
if (argc > 1):
   temp=sys.argv[1]
   temp=temp.strip()
   if (temp != ""):
      host=temp
port=10500 ;# default port
if (argc > 2):
   temp=sys.argv[2]
   temp=temp.strip()
   if (re.search("^\d+$",temp)):
      port=int(temp) ;# int() gives error on null string
# print("argc:",argc,"host:",host,"port:",port)

# Create new client object
print("\nConnecting to host:",host,"port:",port)
try:
   cl=multiprocessing.connection.Client((host,port))
except:
   print("\nERROR0:",sys.exc_info()[1])
   exit(1)
# print_data.print_data(cl) ;# for learning purposes

# MUST use default client blocking mode! There is no setblocking() method!

# Show socket info.
# Fileno is unique only to this instance of python.
# Fileno is often reused by other concurrently running python programs.
# print("fileno:",cl.fileno())
# How to do getsockname() getpeername() ?

# Process user input.
while (1):
   cmd=input("cmd:")
   cmd=cmd.strip()
   # print("cmd:",cmd)
   if (cmd == "q"):
      break

   # Send cmd, wait for response.
   # NB: To force the server to become non-response, in the server window
   # type CTRL-S ("xoff") several times. This allows you to test the error
   # handling code below.
   # NB: sending a null byte string doesnt send anything to server. If you
   # turn off the cmd.strip and the if below, then you can send & receive
   # a single space or more.
   if (cmd != ""):
      timeout=1 ;# error flag
      try:
         cl.send(bytes(cmd,"ASCII"))
      except:
         print("\nERROR1:",sys.exc_info()[1])
         exit(1)

      # The for loop provides a poll/wait timeout for blocking operations.
      for i in range(16):
         # poll(x) waits for x seconds for input to be available before returning.
         # This is how you deal with timeouts for a blocking client object.
         try: 
            # readable=cl.poll(0.2) ;# broken on Windows, see [URL unfurl="true"]http://bugs.python.org/issue16955[/URL]
            readable=bool(multiprocessing.connection.wait([cl],0.2)) ;# Work around for issue 16955
            # print("i:",i,"readable:",readable)
         except:
            print("\nERROR2:",sys.exc_info()[1])
            exit(1)

         if (readable):
            try:
               # cl.close() ;# test code
               resp=cl.recv()
               print("i:",i,"resp:",resp)
            except:
               print("\nERROR3:",sys.exc_info()[1])
               exit(1)

            timeout=0 ;# we got some data
            break

      if (timeout):
         print("ERROR4: Timeout for cmd:",cmd)

# Clean up.
cl.close()
print("\nAll done!")
 
Status
Not open for further replies.

Part and Inventory Search

Sponsor

Back
Top