How to reconnect a socket on asyncio?

Beyonlo Sam picture Beyonlo Sam · Sep 23, 2014 · Viewed 8.8k times · Source

I would like to create two protocols (TcpClient and UdpServer) with asyncio on app.py where the TcpClient will have a persistent connection with the server.py and UdpServer serving as UDP Server:

What I need:
a) That both protocols to communicate: calling method each other. This are working just on the first connection. If TcpClient reconnect, it can't send again the string "send to tcp." coming from UdpServer. I check with print(self) and the TcpClient create a new instance and old still exists but without connection, but I don't know how to refactory that. I think that I using asyncio in the wrong way.
b) When TcpClient disconnect from server.py, wait 5s and try to reconnect again, and so on. I tryed to to that using call_later() of asyncio, but I think are there a native way to do that and not a artifice.
c) When I start the app.py and if TcpClient can't to connect I would like to try to reconnect again after 5s, and so on. I dont't know how to do that.

Here the my examples tests of app.py server.py. The server.py is just for for tests - this will be another language.

Just to say what I tried:
1) When I start app.py and server.py is down, app.py don't retry.
2) When app.py are connected to the server.py and server is down and quickly up, the TcpClient reconnect, but I can't more to connect each other methods on the new instance and send string "send to tcp." to the server.py, just the old, where don't have more connection.
3) If I use asyncio.async() instead run_until_complete() I cant to call methods from each other protocols.

I put the app.py and server.py here, so you can just copy and run to tests.

I use ncat localhost 9000 -u -v to send the string "send to tcp.". This string need to be printed on UdpServer Class and pass to method send_data_to_tcp on TcpClient Class and this method will be sent the string to the server.py. <- This not works after first reconnect of tcpClient.

I'm using the Python 3.4.0.

Thank you so much.

app.py:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        #client_tcp = loop.run_until_complete(coro)
        client_tcp = asyncio.async(coro)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)


#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)

loop.run_forever()

server.py:

import asyncio

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
        self.transport.write(data)

        # close the socket
        #self.transport.close()

    #def connection_lost(self):
    #    print('server closed the connection')



loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))

print('serving on {}'.format(server.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    server.close()
    loop.close()

Answer

dano picture dano · Sep 23, 2014

You really only need a few small fixes. First, I wrote a coroutine to handle the connection retries:

@asyncio.coroutine
def do_connect():
    global tcp_server  # Make sure we use the global tcp_server
    while True:
        try:
            tcp_server = yield from loop.create_connection(TcpClient, 
                                                           'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(5)
        else:
            break

Then we use this to start everything up:

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

The next piece is handling the server going down/coming back after app.py is started up. We need to fix tcp_client_disconnected and connect_client_tcp to handle that properly:

def connect_client_tcp(self):
    global client_tcp
    task = asyncio.async(do_connect())
    def cb(result):
        client_tcp = result
    task.add_done_callback(cb)

def tcp_client_disconnected(self, data, info):
    print(data)
    self.client_tcp_info = info
    self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

The interesting piece is connect_client_tcp. You had two problems with your original version:

  1. You were assigning client_tcp directly to the result of asyncio.async(coro), which means client_tcp was assigned to an asyncio.Task. That wasn't what you wanted; you wanted client_tcp to be assigned to the result of the completed asyncio.Task. We achieve that by using task.add_done_callback to assign client_tcp to the result of the Task once it completes.

  2. You forgot global client_tcp at the top of the method. Without that, you were just creating a local variable called client_tcp, which was being thrown away at the end of connect_client_tcp.

Once those issues are fixed, I'm able to run app.py, start/stop serv.py whenever I want, but always see all messages properly delivered from ncat to serv.py when all three components are running together.

Here's the complete app.py, for easy copy/pasting:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        global client_tcp
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        task = asyncio.async(do_connect())
        def cb(result):
            client_tcp = result
        task.add_done_callback(cb)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

@asyncio.coroutine
def do_connect():
    global client_tcp
    while True:
        try:
            client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(1)
        else:
            break

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()