For those of you learning multiprocessing and multiprocessing.connection modules, here is a sample server that launches parallel children processes, echoes messages and will stress the CPUs on demand.
BTW, you MUST use the multiprocessing.connection.Client to connect with the server. Raw sockets cause MemoryError, see:
BTW, you MUST use the multiprocessing.connection.Client to connect with the server. Raw sockets cause MemoryError, see:
Python:
# Written by John Brearley, Jan 2013
# 2013/1/10 submitted bug to: [URL unfurl="true"]http://bugs.python.org/issue16920[/URL]
# Answer: MUST use multiprocessing client! It has msg length prefix in data.
import multiprocessing,multiprocessing.connection,re,sys,threading,time
rc_shutdown=3 ;# return code to signal server shutdown
srv_ctl=1 ;# flag to control while loops
# Routine handles new incoming sockets as a separate process.
def handle_socket(l,q):
name=multiprocessing.current_process().name
print("handle_socket name:",name,"waiting for socket")
# print("handle_socket l:",l,"q:",q)
# print_data.print_data(l) ;# for learning purposes
# print_data.print_data(q)
# code blocks here waiting for incoming socket.
conn=l.accept()
# print("handle_socket name:",name,"conn:",conn)
# print_data.print_data(conn) ;# for learning purposes
# print("handle_socket name:",name,"fileno:",conn.fileno())
# How to get getsockname() getpeername() ?
# Tell the server to launch another child process.
# We dont do it here, as it will become a child of this
# process, and wont be visible to the server as an independant
# process. Sub-children process seem to be hidden from the parent.
q.put("launch") ;# tells the server to launch another process
# Here conn.poll() does work. However, since this is a server, its job is
# to sit and wait, so blocking mode is fine. There may be cases where you
# want to detect the remote client going to sleep, being slow, so you can
# implement timeout loops using conn.poll(x)
# rc=conn.poll(0.5)
# print("handle_socket poll rc:",rc)
# New incoming socket is in blocking mode, no supported option for non-blocking.
print("handle_socket name:",name,"waiting for data")
rc=0
while (1):
# Wait for data
try:
data=conn.recv()
except:
msg=sys.exc_info()[1]
msg=str(msg)
if (msg == ""):
print("handle_socket recv on name:",name,"null msg ==> socket closing!","msg:",msg)
else:
print("handle_socket recv on name:",name,"ERROR: ==> socket closing!","msg:",msg)
rc=1
break
# Did we get any data?
dr=len(data) ;# received data length
if (dr == 0):
print("handle_socket recv on name:",name,"no data ==> socket closing!")
break
# Send back the received data
print("handle_socket send on name:",name,"data:",data)
conn.send(data) ;# Does NOT return data length like base socket does
# Check for commands in received data
# NB: str chokes on null data!!!
# str() will be covered by dr==0 check above.
try:
temp=str(data,"ASCII")
except:
print("handle_socket name:",name,"ERROR:",sys.exc_info()[1])
temp=""
# print("handle_socket name:",name,"temp:",temp)
# Look for stress command, with optional number of seconds.
# Drive CPU to 100% useage. Helps prove that tasks are distributed across available CPU.
m=re.search("stress\s*(\d*)",temp,re.I)
if (m):
try:
delay=int(m.group(1)) ;# use optional number from stress command
except:
delay=10
print("handle_socket creating CPU stress for:",delay,"seconds, cmd from:",name)
start_sec=time.time()
temp=0
while (temp < delay):
temp=time.time()-start_sec
print("handle_socket done CPU stress for:",delay,"seconds, cmd from:",name)
# Look for shutdown command.
if (temp == "shutdown"):
print("handle_socket Shutting down server, cmd from:",name)
rc=rc_shutdown ;# Main server loop looks at exitcode of child process
break
# Close socket, give appropriate return code.
conn.close()
exit(rc)
def monitor_stdin():
global srv_ctl
# Main process shows sys.stdin is connected to stdin. BUT, a child
# process shows null! A thread also has access to stdin.
# print_data.print_data(sys.stdin)
# Check keyboard/stdin on terminal window for shutdown command.
# Done as a separate process as I cant find a way to make stdin
# non-blocking. On Windows, select only works on sockets, not stdin.
print("monitor_stdin looking for keyboard input")
while (srv_ctl):
line=input() ;# blocking call
# line=sys.stdin.readline() ;# blocking call
line=line.strip()
if (line == ""):
# Most of the time there will be no stdin data available.
pass
elif (line == "shutdown"):
print("monitor_stdin shutting down server, cmd from keyboard")
srv_ctl=0 ;# Stops main server loop and this local loop
break
else:
print("monitor_stdin WARNING: ignoring:",line)
print("monitor_stdin To shutdown the server process, type: shutdown")
time.sleep(1)
exit()
# Main program
if (__name__ == "__main__"):
# Create a new multiprocessing connection listener object
port=10500
host="" ;# ' means use all addresses available
l=multiprocessing.connection.Listener((host,port))
# print_data.print_data(l) ;# for learning purposes
# Listener has no methods listen, setblocking or fileno
print("\nmain server listening on host",host,"port",port)
# Code is designed around the fact that the listener object is blocking only!
# method poll() is for Client only, not Listener!
# rc=s.poll(0.5) ;# test code, fails!
# print("main poll rc:",rc)
# The issue with the first child process launching another child process is that
# the first child process will not terminate untill all its children process have
# also terminated. Also, the sub-children process do not appear to be visible to
# the main process. To avoid these issues, we ensure that the main process always
# launches the child process.
# Since we only want one child process doing an accept() at any given time, we need
# the child process to be able to signal the server process when its time to launch
# another process. This is done using the multiprocessing queue.
q=multiprocessing.Queue()
# print_data.print_data(q) ;# for learning purposes
q.put("launch") ;# tells the server to launch the first process
time.sleep(0.2)
# print("main qsize:",q.qsize(),"empty:",q.empty())
# Keep track of all children launched. While the method multiprocessing.active_children()
# will give us the currently active children, it wont tell us about children that just terminated.
# So if we want to check on the exitcode of a recently terminated child, we need to have our
# own child list that we can go look at.
child_list=[]
# NB: We could use the queue pass back a shutdown command. However, I chose to use
# child process return codes to signal the shutdown command.
# We also watch the terminal window keyboard/stdin for the shutdown command.
# This is done as a seperate thread, as I cant find a non-blocking way to read
# stdin on Windows. A thread has access to stdin, but a child process does not.
t=threading.Thread(target=monitor_stdin);# create new process
# Keep thread object separate, as thread has no exitcode to monitor or terminate method
t.start() ;# dont use obj.run(), run forces serial mode, not parallel!
print("main created t.ident:",t.ident,"t.name:",t.name)
# Main server loop
while (srv_ctl):
# Get next item from queue. By default, q.get will wait forever if no work is available!
# We add a 3 sec timeout to avoid lockup!
try:
task=q.get(True,3)
except:
task="<Empty>"
# print("main q task:",task)
# If necessary, launch a handle_socket with access to listener & queue objects.
if (task == "launch"):
# NB: args=(...,) must have at least one comma! Even if only 1 arg!
obj=multiprocessing.Process(target=handle_socket, args=(l,q));# create new process
# print("main created process",obj)
child_list.append(obj) ;# save the child object
obj.start() ;# dont use obj.run(), run forces serial mode, not parallel!
# child pid is stored in object as ident & pid, wont have usefull value until process actually starts
print("main created obj.pid:",obj.pid,"obj.name:",obj.name)
# Poll existing children processes.
# print("main child_list:",child_list)
if (len(child_list) == 0):
# print("main no active children")
pass
else:
for i,child in enumerate(child_list):
# print("main polling i:",i,"child:",child)
if (not child.is_alive()):
# child process has finished, get exitcode
rc=child.exitcode
print("main child:",child.name,"done, rc:",rc)
if (rc != None):
# Forget this child, we have now got its exitcode
del child_list[i]
# Child process return code can signal shutdown for server.
if (rc == rc_shutdown):
print("main shutting down server, cmd from:",child.name)
srv_ctl=0 ;# stops main while loop & monitor_stdin while loop
# Shutdown server, terminate any remaining children processes.
l.close()
for child in child_list:
if (child.is_alive()):
child.terminate()
time.sleep(0.2)
print("main child:",child.name,"terminated rc:",child.exitcode)
# End main program. Prompt user, wait for thread to terminate.
if (t.is_alive()):
print("\nmain hit return to end the keyboard monitor thread.")
while (t.is_alive()):
time.sleep(1)
print("\nmain all done.")