Background
I have two python processes that need to communicate with each other. The comminication is handled by a class named Pipe. I made a seperate class for this because most of the information that needs to be communicated comes in the form of dictionaries so Pipe implements a pretty simple protocol for doing this.
Here is the Pipe constructor:
def __init__(self,sPath):
"""
create the fifo. if it already exists just associate with it
"""
self.sPath = sPath
if not os.path.exists(sPath):
try:
os.mkfifo(sPath)
except:
raise Exception('cannot mkfifo at path \n {0}'.format(sPath))
self.iFH = os.open(sPath,os.O_RDWR | os.O_NONBLOCK)
self.iFHBlocking = os.open(sPath,os.O_RDWR)
So ideally I would just construct a Pipe in each process with the same path and they would be able to talk nice.
I'm going to skip out stuff about the protocol because I think it is largely unnecessary here.
All read and write operations make use of the following 'base' functions:
def base_read_blocking(self,iLen):
self.lock()
lBytes = os.read(self.iFHBlocking,iLen)
self.unlock()
return lBytes
def base_read(self,iLen):
print('entering base read')
self.lock()
lBytes = os.read(self.iFH,iLen)
self.unlock()
print('exiting base read')
return lBytes
def base_write_blocking(self,lBytes):
self.lock()
safe_write(self.iFHBlocking,lBytes)
self.unlock()
def base_write(self,lBytes):
print('entering base write')
self.lock()
safe_write(self.iFH,lBytes)
self.unlock()
print('exiting base write')
safe_write was suggested in another post
def safe_write(*args, **kwargs):
while True:
try:
return os.write(*args, **kwargs)
except OSError as e:
if e.errno == 35:
import time
print(".")
time.sleep(0.5)
else:
raise
locking and unlocking is handled like this:
def lock(self):
print('locking...')
while True:
try:
os.mkdir(self.get_lock_dir())
print('...locked')
return
except OSError as e:
if e.errno != 17:
raise e
def unlock(self):
try:
os.rmdir(self.get_lock_dir())
except OSError as e:
if e.errno != 2:
raise e
print('unlocked')
The Problem
This sometimes happens:
....in base_read
lBytes = os.read(self.iFH,iLen)
OSError: [Errno 11] Resource temporarily unavailable
Sometimes it's fine.
The Magical Solution
I seem to have stopped the problem from happening. Please note this is not me answering my own question. My question is explained in the next section.
I changed the read functions to look more like this and it sorted stuff out:
def base_read(self,iLen):
while not self.ready_for_reading():
import time
print('.')
time.sleep(0.5)
lBytes = ''.encode('utf-8')
while len(lBytes)<iLen:
self.lock()
try:
lBytes += os.read(self.iFH,iLen)
except OSError as e:
if e.errno == 11:
import time
print('.')
time.sleep(0.5)
finally:
self.unlock()
return lBytes
def ready_for_reading(self):
lR,lW,lX = select.select([self.iFH,],[],[],self.iTimeout)
if not lR:
return False
lR,lW,lX = select.select([self.iFHBlocking],[],[],self.iTimeout)
if not lR:
return False
return True
The Question
I'm struggling to find out exactly why it is temporarily unavailable. The two processes cannot access the actual named pipe at the same time due to the locking mechanism (unless I am mistaken?) so is this due to something more fundamental to fifos that my program is not taking into account?
All I really want is an explanation... The solution I found works but it looks like magic. Can anyone offer an explanation?
System
I had a similar problem with Java before. Have a look at the accepted answer--- the problem was that I was creating new threads in a loop. I suggest you have a look at the code creating the pipe and make sure you are not creating multiple pipes.