Légáramlás: Hogyan lehet több ezer feladatot ütemezni egy lövésben

Ebben a bejegyzésben megvitatom, hogyan lehet ütemezni több ezer feladatot egy dag belsejében. Nem fogok arra összpontosítani, hogy mi az Airflow és hogyan lehet telepíteni, hanem ehelyett azt fogom megvitatni, hogy miként lehet ütemezni nagyszámú feladatot egy dag belsejében.

Alapvetően az Airflow több DAG-nak van kialakítva, és a DAG-n belül több száz vagy ezer feladat is elvégezhető. Szóval, mi történik, ha nagyszámú feladatot akarunk ütemezni, mondjuk kb. 60000 körül vagy annál több? Ezt magyaráztam ebben a blogban.

Az Airflow-val dolgozom a munkafolyamat automatizálása érdekében. De a vállalkozásomban nagyon nagy mennyiségű adat van, és megpróbáltam az Airflow különböző verzióinak felhasználásával, és az igazán hatalmas adatok miatt csaknem 70000 feladat van egy DAG-n belül. Kipróbáltam az Airflow különféle verzióit, és a legújabb verzió ütemezheti az 5000 feladatokat, de ha többet szeretnénk ütemezni, akkor az ütemező futó állapotban marad a feladatok ütemezése nélkül. Megtaláltam minden problémát, és megvizsgáltam, hogyan lehet megoldani, mi a tényleges oka, és végül pedig ennek a blognak az írása.

Ez az Airflow egyik használati esete, amikor több ezer feladat van egy DAG-ban. Először az Airflow 1.10.3 verziót kell használnunk, utána pedig nem sok feladatra kell összpontosítanunk, tehát az Airflow 1.10.3 verziót kell használnunk. A verzió telepítéséhez kövesse az alábbi lépéseket:

  • Először új környezetet kell létrehoznunk, és aktiválnunk kell azt a következő paranccsal:
conda create -n airflow_3
conda aktiválja a légáramot_3
  • Az Airflow telepítéséhez az 1.10.3 specifikus verziójával használja a következő parancsot:
conda install -c conda-forge légáram = = 1.10.3
  • Bizonyos bizonyos követelményeket kell meggyőződnie arról, hogy ez a verzió nem működik-e a1.0.9 lombikkal, tehát ha van ilyennél nagyobb lombik, használja a következő parancsot:
pip telepítő lombik == 1.0.4
pip install funcsigs == 1.0.0 (Ez egy másik követelmény, amelyet telepíteni kell)
  • Javasoljuk, hogy a Zeller Végrehajtót használja, amikor ezzel a sok nagyszámú feladattal dolgozunk, mivel ezeket a feladatokat párhuzamosítani kell, és ezt a Zeller Végrehajtóval is elérhetjük. A Zeller telepítéséhez használja a következő parancsot:
pip zeller telepítése
  • Munkavállalókat kell használnia, és brókert kell beállítania a zeller-végrehajtóhoz, én a RabbitMQ-t használom brókerként. A bróker beállításához a következő struktúrát használhatja:
broker_url = amqp: // “felhasználónév”: “jelszó” @ ”host_név”: “port” /

például

broker_url = amqp: // vendég: vendég @ localhost: 5672 /
  • A Zeller Végrehajtó felhasználói felületének megtekintéséhez használhatjuk a Virágot, és telepíthetjük a következő parancsot:
conda install -c conda-forge virág
  • Ezt követően meg kell változtatnunk a konfigurációt, hogy párhuzamosan futtasson több ezer feladatot, és több ezer feladatot ütemezhessen egy lövésben.
[mag]
végrehajtó = CeleryExecutor párhuzamosság = 200000 non_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[Ütemező]
max_thread = 10 (Használhatja a szálakat a programjának megfelelően növelve vagy csökkentve)

Ezek a fő beállítások, amikor egy lövés alatt több ezer feladatot szeretne ütemezni. Be kell állítania azt, hogy hány maximális DAG-ot kíván futtatni párhuzamosan, és hány feladatot végez egy DAG-ban.

A fő paraméter a „Non_pooled_task_slot_count”, amelyet eltávolítottak az Airflow 1.10.4 verziójából, tehát az 1.10.3 verziót használom, mivel ez a paraméter nagyon fontos szerepet játszik a feladatok ütemezésében.

A fő különbség a „Non_pooled_task_slot_count” eltávolítása után az, hogy az alapértelmezett értékre 128 alapértelmezett értékű base_poolot használ (ez a követelménynek megfelelően növelhető). A „Non_pooled_task_slot_count” fő feladata a feladatok ütemezése, és nincs hozzákapcsolva az alapértelmezett_poolhoz vagy az adatbázisból származó bármely más kapcsolathoz, így növelhetjük ezt a számot, amennyit csak akarunk, de ha a „default_pool” bővíti a résidőket. akkor csatlakozik a meglévő adatbázis-kapcsolatokhoz, és egyszerre nem futhat 100000 adatbázis-kapcsolat. Alapvetően a „Non_pooled_task_slot_count” eltávolításra került az „alapértelmezett_pool” helyett.

Ez a bejegyzés tartalmazza a választ arra a kérdésre, hogy miért akadályozza az ütemezőt, elakadt, nem ütemezett sok feladatot, vagy egész nap fut, anélkül, hogy bármit tenné. Ennek az összes válasznak egy válasz van az Airflow 1.10.3 verziójának használatához.

Az Airflow 1.10.3 használatakor meg kell határoznunk, melyik készletet kell használni a DAG-nak, mivel alapértelmezés szerint nem használja az „default_pool” -t, tehát a feladatok létrehozása során át kell adnunk a para mater pool = 'defautl_pool' -t. Az 'alapértelmezett_pool' felhasználót felhasználói felület (Rendszergazda -> készletek) használatával hozhat létre, vagy parancssorral is elkészíthető:

airflow pool -s default_pool 128 'alapértelmezett pool'.

Íme a példa a DAG mintára:

import os from datetime import datetime, timedelta import airflow from airflow import DAG import airflow.operators.dummy_operator import DummyOperator
default_args = {'tulajdonos': 'Airflow', 'atkarīgs_on_past': Hamis, 'start_date': airflow.utils.dates.days_ago (2), 'retries': 1, 'retry_delay': timedelta (minutes = 1),}
dag = DAG ('dummy_try1', default_args = default_args, ütemterv_intervall = Nincs)
i-re a tartományban (50000): feladatok = DummyOperator (task_id = '{}'. formátum (i), dag = dag, pool = 'default_pool)

Az összes verzió közötti különbséget az alábbi linken ellenőrizheti:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104