Source code for zeppy.ppipes

# Copyright (c) 2020, 2022 Santosh Philip
# =======================================================================
#  Distributed under the MIT License.
#  (See accompanying file LICENSE or copy at
#  http://opensource.org/licenses/MIT)
# =======================================================================
# -*- coding: utf-8 -*-
"""parallel pipeline functions"""


import time
import os.path
import string
import random
import multiprocessing
from datetime import date, datetime
from functools import wraps

import zmq


[docs]def measure(func): @wraps(func) def _time_it(*args, **kwargs): start = int(round(time.time() * 1000)) try: return func(*args, **kwargs) finally: end_ = int(round(time.time() * 1000)) - start print(f"Total execution time: {end_ if end_ > 0 else 0} ms") return _time_it
[docs]def randstr(strlength): """return a string of length strlength""" randletters = string.ascii_lowercase + string.digits return "".join([random.choice(randletters) for i in range(strlength)])
def _v_print(txt, verbose=False): """print if verbose==True""" if verbose: print(txt) def _zmq_sink(verbose=False, sinkipc=None, killipc=None, resultipc=None): """zmq sink for rweather""" context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.bind(sinkipc) # socket to publish end of tasks endpubliser = context.socket(zmq.PUB) endpubliser.bind(killipc) # socket to publish results resultpubliser = context.socket(zmq.PUB) resultpubliser.bind(resultipc) num_calc = receiver.recv_pyobj() # num_calc = num_calc.decode() num_calc = int(num_calc) _v_print(f"number of calculations = {num_calc}", verbose=verbose) # Start our clock now tstart = time.time() allresults = list() for task_nbr in range(num_calc): s = receiver.recv_pyobj() allresults.append(s) # end the workers # print("sending end message") endpubliser.send(b"end this now") # publish the results # print("publishing the results") allresults.sort() # make sure they are in the right order # remove order index and publish it resultpubliser.send_pyobj([line for _, line in allresults]) # resultpubliser.send_pyobj(allresults) # Calculate and report duration of batch tend = time.time() _v_print( "Total taken time for all calcs: %d msec" % ((tend - tstart) * 1000), verbose=verbose, ) def _zmq_worker( func, wnum=None, verbose=False, sinkipc=None, killipc=None, ventipc=None ): """zmq worker for rweather""" if wnum is None: wnum = 0 # use wnum, inc case you want to now which woker did the job context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect(ventipc) # Socket to send messages to sink sender = context.socket(zmq.PUSH) sender.connect(sinkipc) # socket to recieve end message to stop this worker endsubscriber = context.socket(zmq.SUB) endsubscriber.connect(killipc) endsubscriber.setsockopt_string(zmq.SUBSCRIBE, "") # Initialize poll set poller = zmq.Poller() poller.register(receiver, zmq.POLLIN) poller.register(endsubscriber, zmq.POLLIN) # Process tasks forever while True: try: socks = dict(poller.poll()) except KeyboardInterrupt: break if receiver in socks: try: i, task = receiver.recv_pyobj() _v_print(f"running item: {i}, in worker: {wnum}", verbose=verbose) # get the args and kwargs try: args = task["args"] except KeyError as e: args = list() try: kwargs = task["kwargs"] except KeyError as e: kwargs = dict() # Do the work result = func(*args, **kwargs) # Send results to sink with index number i sender.send_pyobj((i, result)) except Exception as e: print("****ERROR****", e) _v_print( f"****ERROR**** above error while running item: {i}, in worker: {wnum}", verbose=True, ) sender.send_pyobj((i, "****ERROR****")) # TODO send this as a log message to logger _v_print( f"sent result of item: {i}, in worker: {wnum} to sink", verbose=verbose ) if endsubscriber in socks: message = endsubscriber.recv() break def _zmq_vent(args_list, verbose=False, sleeptime=0.1, sinkipc=None, ventipc=None): """zmq vent for rweather""" context = zmq.Context() # Socket to send messages on sender = context.socket(zmq.PUSH) sender.bind(ventipc) # Socket with direct access to the sink: used to synchronize start of batch # if sink is not running at this point - whole thing gets fucked up sink = context.socket(zmq.PUSH) sink.connect(sinkipc) totwork = len(args_list) sink.send_pyobj(totwork) # total_msec = 0 for i, task in enumerate(args_list): sender.send_pyobj((i, task)) # Give 0MQ time to deliver - otherwise all of it will go to one worker # print(f"sleeptime={sleeptime}") time.sleep(sleeptime) def _zmq_resultsub(verbose=False, resultipc=None): """get the results of the reweather calculations""" # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(resultipc) socket.setsockopt_string(zmq.SUBSCRIBE, "") # print('started result subscriber') message = socket.recv_pyobj() return message def _fan_out_in( func, args_list, nworkers=None, verbose=False, sleeptime=0.1, ipcs=None ): """Starts a distributed zmq run of `func` Each instance of `func` is run in a separate process Uses the classic patallel-pipeline from zmq - vent -> workers -> sink The results are published in PUB-SUB pattern""" sinkipc = ipcs["sinkipc"] killipc = ipcs["killipc"] resultipc = ipcs["resultipc"] ventipc = ipcs["ventipc"] # starts the workers if nworkers is None: nworkers = len(args_list) for i in range(nworkers): # for i in range(1): p = multiprocessing.Process( target=_zmq_worker, args=( func, i, ), kwargs={ "verbose": verbose, "sinkipc": sinkipc, "killipc": killipc, "ventipc": ventipc, }, ) p.start() _v_print(f"started worker {i}", verbose=verbose) # Starts sink p = multiprocessing.Process( target=_zmq_sink, kwargs={ "verbose": verbose, "sinkipc": sinkipc, "killipc": killipc, "resultipc": resultipc, }, ) p.start() _v_print("starting sink", verbose=verbose) # starts the vent p = multiprocessing.Process( target=_zmq_vent, args=(args_list,), kwargs={ "verbose": verbose, "sleeptime": sleeptime, "sinkipc": sinkipc, "ventipc": ventipc, }, ) p.start() _v_print("started ventilator", verbose=verbose) def _sinkipc(size=8): return f"ipc:///tmp/zeppysink_{randstr(size)}" def _killipc(size=8): return f"ipc:///tmp/zeppykill_{randstr(size)}" def _resultipc(size=8): return f"ipc:///tmp/zeppyresult_{randstr(size)}" def _ventipc(size=8): return f"ipc:///tmp/zeppyvent_{randstr(size)}"
[docs]def ipc_parallelpipe(func, args_list, nworkers=None, verbose=False, sleeptime=0.1): """distributed run of the func using zmq Returns the results of all the run""" args_list = args_kwargs_helper(args_list) # generate ipcs sz = 3 ipcs = dict( sinkipc=_sinkipc(sz), killipc=_killipc(sz), resultipc=_resultipc(sz), ventipc=_ventipc(sz), ) _fan_out_in( func, args_list, nworkers=nworkers, verbose=verbose, sleeptime=sleeptime, ipcs=ipcs, ) # -> parallel-pipline publishing the results message = _zmq_resultsub( resultipc=ipcs["resultipc"] ) # subscribes to the published results return message
[docs]def args_kwargs_helper(args_kwargs_list): return [clean_args_kwargs(item) for item in args_kwargs_list]
[docs]def clean_args_kwargs(args_kwargs): if isinstance(args_kwargs, dict): return args_kwargs if isinstance(args_kwargs, (list, tuple)): return {"args": args_kwargs, "kwargs": {}} return {"args": (args_kwargs,), "kwargs": {}}
# ---- the stuff above should be generic
[docs]def waitsome(seconds): """wait for some seconds""" time.sleep(seconds) return seconds
[docs]def wait_add(first, second): """wait for the sum of first and second. return the sum""" seconds = first + second return waitsome(seconds)
[docs]def wait_add_mult(first, add=0, mult=1): """calculate the result=(first+add)*mult. Then waitsome(result)""" result = (first + add) * mult return waitsome(result)
[docs]def idfversion(idf): versions = idf.idfobjects["version"] ver = versions[0] return ver.Version_Identifier
[docs]def eplaunch_run(idf): # import witheppy.runner # witheppy.runner.eplaunch_run(idf) # idf.run(output_directory='./eplus/', output_prefix='C') import subprocess import os.path wfile = idf.epw idfname = idf.idfname justname = os.path.basename(idfname).split(".")[0] dirname = os.path.dirname(idfname) runstr = f"/Applications/EnergyPlus-9-1-0/energyplus -d {dirname} -p {justname} -s C -w {wfile} {idfname}" # runstr = f'/Applications/EnergyPlus-9-1-0/energyplus --help' # runargs = ['/Applications/EnergyPlus-9-1-0/energyplus', '-d ./eplus_files/ -p Minimal -s C ./eplus_files/Minimal.idf'] subprocess.check_call(runstr.split()) return None
[docs]def idf_run(idf): import witheppy.runner witheppy.runner.eplaunch_run(idf)
# does not wirk with ppipes
[docs]def idf_multirun(idf_kwargs): import eppy idf = idf_kwargs["args"] options = idf_kwargs["kwargs"] idf.run(**options)
[docs]def make_options(idf): idfversion = idf.idfobjects["version"][0].Version_Identifier.split(".") idfversion.extend([0] * (3 - len(idfversion))) idfversionstr = "-".join([str(item) for item in idfversion]) fname = idf.idfname options = { "ep_version": idfversionstr, "output_prefix": os.path.basename(fname).split()[0], "output_suffix": "C", "output_directory": os.path.dirname(fname), } return options
[docs]@measure def runeverything(): # waitlist = [1, 2, 3, 2, 1] # # waitlist = [(1, ), (2, ), (3, ), (2, ), (1, )] # # waitlist = [(1, 0), (1, 1), (2, 1), (2, 0), (0, 1)] # # waitlist = [(1, 0, 1), (1, 1, 1), (2, 1, 1), (2, 0, 1), (0, 1, 1)] # waitlist = [(1, 0, 1), (1, 1, 1), (2, 1, 1), (2, 0, 1), (0, 1, 1)] # # waitlist = [(1, 0, 1), ] # # waitlist = [{'args':(1, ), 'kwargs':{'add':0, 'mult':1}}] # waitlist = [ # # {'args':1, 'kwargs':{'add':3}}, # {'args':(1,), 'kwargs':{'mult':3}}, # {'args':(1,), 'kwargs':{'add':2, 'mult':3}}, # {'args': (1, 2, 3)}, # ] # print(waitlist) # func = wait_add_mult # result = ipc_parallelpipe(func, waitlist, nworkers=None, verbose=True) # # print(result) # running eppy in zeppy import eppy fnames = [ # "./eplus_files/Minimal.idf", "./eplus_files/UnitHeaterGasElec.idf", "./eplus_files/ZoneWSHP_wDOAS.idf", "./eplus_files/ZoneWSHP_wDOAS_1.idf", ] wfile = ( "/Applications/EnergyPlus-9-1-0/WeatherData/USA_CO_Golden-NREL.724666_TMY3.epw" ) idfs = [eppy.openidf(fname, epw=wfile) for fname in fnames] waitlist = [[{"args": idf, "kwargs": make_options(idf)}] for idf in idfs] func = idf_multirun result = ipc_parallelpipe(func, waitlist, nworkers=None, verbose=True, sleeptime=1) # sleeptime=1 sec. This is a pause between sending the task out. Not sure if a single worker is grabbing all the tasks in E+. May need some testing to confirm. print(result)
if __name__ == "__main__": runeverything()