Python 3 - Global Variables with AsyncIO/APScheduler

LikesAChallenge picture LikesAChallenge · May 19, 2015 · Viewed 10.5k times · Source

Been struggling with this for a while.

Based on this thread: Using global variables in a function other than the one that created them

I should be able to update the variable used by thread_2 by scheduling a task at certain times.

The code:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():

        global variable
        variable = 90
        print ('Day Variable: ',variable)

def night_limits():

        global variable
        variable = 65
        print ('Night Variable: ',variable)


def thread_2():


    while True:

        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable))

        time.sleep(2)


if __name__ == "__main__":

    variable = 60

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2))


    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

Results in:

19:31:54 - Variable: 60 
19:31:56 - Variable: 60 
19:31:58 - Variable: 60    
Night Variable:  65
19:32:00 - Variable: 60 
19:32:02 - Variable: 60 

I am missing something, but I can't see what!

Thoughts?

Thanks!!!

Answer

dano picture dano · May 21, 2015

Because you're using a ProcessPoolExecutor, you need to use a process-safe object in place of an ordinary integer. If you only need to support Linux (and can therefore rely on having fork()), you can just use an ordinary, global multiprocessing.Value to do this.

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():
        variable.value = 90
        print ('Day Variable: ',variable.value)

def night_limits():
        variable.value = 65
        print ('Night Variable: ',variable.value)


def thread_2():
    while True:
        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable.value))

        time.sleep(2)


if __name__ == "__main__":
    variable = multiprocessing.Value('i', 60)

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2))


    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

If you need to support both Windows and Linux, you'll need to use a multiprocessing.Manager to create the Value object, and explicitly pass that object to the function you're running in the Executor:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():
        variable.value = 90
        print ('Day Variable: ',variable.value)

def night_limits():
        variable.value = 65
        print ('Night Variable: ',variable.value)


def thread_2(variable):
    while True:
        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable.value))

        time.sleep(2)


if __name__ == "__main__":

    m = multiprocessing.Manager()
    variable = m.Value('i', 60)

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2, variable))  # Need to pass variable explicitly

    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

Because Windows lacks fork support, you need to explicitly pass the Value to the function you're running in the Executor. If you don't, the child process will say that the variable doesn't exist. However, since you're explicitly passing the Value to the run_in_executor method, you can't use an ordinary multiprocessing.Value - you'll get a RuntimeError saying that "Synchronized objects should only be shared between processes through inheritance".

Using the multiprocessing.Manager works around this; the multiprocessing.Manager starts a process that can create and managed process-shared objects. Calling m.Value() returns a Proxy to a shared Value, and that Proxy can be passed to run_in_executor without raising an exception.