{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Using UNIX pipes - Simple use of zeppy\n", "\n", "As the code is being written the tutorial/examples are being written. So there is a qulaity of exploration to this section. Let try to run the simples code possible with this package\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Note:* You can find this `jupyter notebook` at `docs/tutorial_docs/simple_zeppy.ipynb`" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# setting the path name so that the tutorial works from this directory\n", "# if you install zeppy with `pip install zeppy` you will not need to do this\n", "import sys\n", "# pathnameto_zeppy = 'c:/eppy'\n", "pathnameto_zeppy = '../../'\n", "sys.path.append(pathnameto_zeppy)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from zeppy import ppipes\n", "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "the function `waitsome` will take wait for the time in `seconds`" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# from ppipes.py\n", "def waitsome(seconds):\n", " \"\"\"wait for some seconds\"\"\"\n", " time.sleep(seconds)\n", " return seconds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us call the function 5 times and see how long it takes" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "time to run this = 9.024 seconds\n" ] } ], "source": [ "waitlist = [1, 2, 3, 2, 1]\n", "starttime = time.time()\n", "for seconds in waitlist:\n", " ppipes.waitsome(seconds)\n", "endtime = time.time()\n", "print(f'time to run this = {(endtime-starttime):.3f} seconds')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "it took 9.01 seconds to run -> pretty close 1+2+3+2+1 = 9 seconds\n", "Let us try this again using the `parallelpipe` function" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "time to run this = 3.485 seconds\n", "[1, 2, 3, 2, 1]\n" ] } ], "source": [ "waitlist = [1, 2, 3, 2, 1] \n", "starttime = time.time()\n", "result = ppipes.ipc_parallelpipe(ppipes.waitsome, waitlist) \n", "endtime = time.time()\n", "print(f'time to run this = {(endtime-starttime):.3f} seconds')\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It took about 3.39 seconds. This is because the `ipc_parallelpipe` ran the function 5 times too. But it ran it in parallel - simultanesouly\n", "Now let us run it with `verbose=True`, so we can see what is going on under the hood." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "started worker 0\n", "started worker 1\n", "started worker 2\n", "started worker 3\n", "started worker 4\n", "starting sink\n", "started ventilator\n", "time to run this = 3.460 secondsrunning item: 1, in worker: 2\n", "sent result of item: 1, in worker: 2 to sink\n", "running item: 2, in worker: 1\n", "sent result of item: 2, in worker: 1 to sink\n", "running item: 4, in worker: 3\n", "sent result of item: 4, in worker: 3 to sink\n", "running item: 0, in worker: 0\n", "sent result of item: 0, in worker: 0 to sink\n", "running item: 3, in worker: 4\n", "sent result of item: 3, in worker: 4 to sink\n", "number of calculations = 5\n", "Total taken time for all calcs: 3351 msec\n", "\n", "result = [1, 2, 3, 2, 1]\n" ] } ], "source": [ "waitlist = [1, 2, 3, 2, 1] \n", "starttime = time.time()\n", "result = ppipes.ipc_parallelpipe(ppipes.waitsome, waitlist, verbose=True) \n", "endtime = time.time()\n", "print(f'time to run this = {(endtime-starttime):.3f} seconds')\n", "print(f'result = {result}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The output tells you what is happening:\n", "\n", "- it starts up 5 workers to run `waitsome` 5 times\n", "- it starts up the sink. The sink collects the results form the workers\n", "- Then it starts the ventilator that sends the 5 tasks to the workers\n", "- the tasks are distributed evenly in no particular order between the workers\n", "- notice that the first worker got it's task before the ventilator finished distributed all the tasks and printed `started ventilator`\n", "\n", "Now let us try this with only 2 workers" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "started worker 0\n", "started worker 1\n", "starting sink\n", "started ventilator\n", "time to run this = 5.248 secondsnumber of calculations = 5\n", "Total taken time for all calcs: 5166 msec\n", "running item: 0, in worker: 1\n", "sent result of item: 0, in worker: 1 to sink\n", "running item: 2, in worker: 1\n", "sent result of item: 2, in worker: 1 to sink\n", "running item: 4, in worker: 1\n", "sent result of item: 4, in worker: 1 to sink\n", "running item: 1, in worker: 0\n", "sent result of item: 1, in worker: 0 to sink\n", "running item: 3, in worker: 0\n", "sent result of item: 3, in worker: 0 to sink\n", "\n", "[1, 2, 3, 2, 1]\n" ] } ], "source": [ "waitlist = [1, 2, 3, 2, 1] \n", "starttime = time.time()\n", "result = ppipes.ipc_parallelpipe(ppipes.waitsome, waitlist, verbose=True, nworkers=2) \n", "endtime = time.time()\n", "print(f'time to run this = {(endtime-starttime):.3f} seconds')\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Looks like `worker 0` made 3 runs, and `worker 1` made 2 runs. The whole thing too 5.14 secs. Still quicker than 9 seconds\n", "\n", "This is fun. Can we do some energyplus runs now. Ha! hold on. Let us move slowly. Here is a different function to try" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def wait_add(first, second):\n", " \"\"\"wait for the sum of first and second. return the sum\"\"\"\n", " seconds = first + second\n", " return waitsome(seconds) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How are we going to send two args to the function" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "started worker 0\n", "started worker 1\n", "started worker 2\n", "started worker 3\n", "started worker 4\n", "starting sink\n", "started ventilator\n", "time to run this = 3.464 secondsrunning item: 1, in worker: 0\n", "sent result of item: 1, in worker: 0 to sink\n", "running item: 2, in worker: 2\n", "sent result of item: 2, in worker: 2 to sink\n", "running item: 0, in worker: 1\n", "sent result of item: 0, in worker: 1 to sink\n", "number of calculations = 5\n", "Total taken time for all calcs: 3352 msec\n", "\n", "[1, 2, 3, 2, 1]\n", "running item: 4, in worker: 4\n", "sent result of item: 4, in worker: 4 to sink\n", "running item: 3, in worker: 3\n", "sent result of item: 3, in worker: 3 to sink\n" ] } ], "source": [ "waitlist = [(1, 0), (1, 1), (2, 1), (2, 0), (0, 1)] \n", "starttime = time.time()\n", "result = ppipes.ipc_parallelpipe(ppipes.wait_add, waitlist, verbose=True) \n", "endtime = time.time()\n", "print(f'time to run this = {(endtime-starttime):.3f} seconds')\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How about a function that has named arguments. Let us try that" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Can we send named arguments to the workers ? \n", "Suppose we have a function called `wait_add_mult`" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "def wait_add_mult(first, add=0, mult=1):\n", " \"\"\"calculate the result=(first+add)*mult. Then waitsome(result)\"\"\"\n", " result=(first + add) * mult\n", " return waitsome(result) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And we want to call it in the following ways:\n", "\n", "- `wait_add_mult(1, add=3)`\n", "- `wait_add_mult(1, mult=3)`\n", "- `wait_add_mult(1, add=2, mult=3)`\n", "- `wait_add_mult(1)`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have to send the args to it in the following way" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- `wait_add_mult(1, add=3) # {'args':(5,), 'kwargs':{'add':3}}`\n", "- `wait_add_mult(1, mult=3) # {'args':(5,), 'kwargs':{'mult':3}}`\n", "- `wait_add_mult(1, add=2, mult=3) # {'args':(5,), 'kwargs':{'add':2, 'mult':3}}`\n", "- `wait_add_mult(1, 2, 3) # {'args': (5, 2, 3)}`" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "started worker 0\n", "started worker 1\n", "started worker 2\n", "started worker 3\n", "starting sink\n", "started ventilator\n", "[4, 3, 9, 9]running item: 0, in worker: 0\n", "sent result of item: 0, in worker: 0 to sink\n", "running item: 1, in worker: 0\n", "sent result of item: 1, in worker: 0 to sink\n", "running item: 3, in worker: 3\n", "sent result of item: 3, in worker: 3 to sink\n", "running item: 2, in worker: 1\n", "sent result of item: 2, in worker: 1 to sink\n", "\n", "number of calculations = 4\n", "Total taken time for all calcs: 9466 msec\n" ] } ], "source": [ " waitlist = [\n", " {'args':(1,), 'kwargs':{'add':3}},\n", " {'args':(1,), 'kwargs':{'mult':3}},\n", " {'args':(1,), 'kwargs':{'add':2, 'mult':3}},\n", " {'args': (1, 2, 3)},\n", " ]\n", " func = ppipes.wait_add_mult\n", " result = ppipes.ipc_parallelpipe(func, waitlist, nworkers=None, verbose=True)\n", " print(result)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Thats all for now. Take a look at the next chapter" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 4 }