"Enter"a basıp içeriğe geçin

Flask Uygulamalarında Redis Queue ile Asenkron Task Yönetimi

Zaman alıcı, karmaşık veya yoğun kaynak gerektiren görevleri yerine getiren bir uygulama geliştirirken, geliştirdiğimiz uygulamanın kullanıcılara cevap verme hızını da düşünmemiz gerekir. Şayet uygulama çalışırken geçen süre, kullanıcılar için de oldukça sinir bozucu olabilir. Hatta işlemin tamamlanmasını beklerken, HTTP sunucusu zaman aşımına bile uğrayabilir.

Burada uygulamamızın sayfayı oluştururken yaptığı her işlemin, sayfayı oluşturmak için gerekli olmadığını anlamamız gerekiyor. E-Posta işlemleri, farklı bir API ile iletişim, büyük veri işleme gibi çeşitli işlemler sayfayı oluşturmak için gerekli değildir. Bu işlemler asenkron görevler (asynchronous tasks) ile yapılabilir, böylece bu işlemlerin tamamlanmasını beklemeden sayfa servis edilebilir.

Bu noktada karşımıza Celery, RabbitMQ, Apache Kafka ve Redis Queue gibi çeşitli mesaj kuyruğu teknolojileri çıkıyor. Bu teknolojiler, işlemleri bir kuyruğa alarak sırayla çalıştırabilmemize olanak sağlıyor. Böylece asenkron olarak süreci tamalayabiliyoruz. Bu yazıda, diğerlerine nazaran daha basit olan Redis Queue teknolojisini inceleyeceğiz.

Redis, Redis Queue ve RQ Dashboard

Redis, açık kaynak kodlu “key-value” yani “anahtar-değer” deposudur. Çeşitli veri yapılarını içerisinde barındırabilir. Böylece veri tabanına ihtiyaç duymadan çeşitli küçük uygulamalar geliştirebilirsiniz. Kurulum ve kullanım detayları için daha önce yazdığım “Python Redis Server Kullanımı” isimli yazıya göz atabilirsiniz.

Redis Queue ise işleri sıraya almak ve arka planda işlemek için geliştirilmiş basit bir Python kitaplığıdır. En yakın rakibi Celery ile kıyaslandığında oldukça basittir. Ancak basit işlemler için güzel ve hızlı çözümler üretir diyebiliriz.

RQ Dashboard ise RQ işlemlerini izlemek için küçük bir monitoring aracıdır. İşlemleri web ve CLI üzerinden takip etmemize olanak sağlar. RQ Dashboard tamamen keyfidir, kullanıp kullanmamak size kalmış bir durum.

Kurulum, Temel Ayarlar ve Temel İşlemler

İşlemlere başlamadan önce Redis Server kurulumu yapmamız gerekiyor, yukarıdaki linkte kurulum detaylarına yer verdim. Örnek proje olarak ise Demongo projesini kullanacağım. Aşağıdaki linkten projeyi indirip kullanmaya başlayabilirsiniz.

Demongo: https://github.com/emregeldegul/demongo

Öncelikle requirements.txt dosyasının içerisine kullanacağımız paketleri ekliyoruz.

rq
Flask-RQ2
rq-dashboard  # Bu kütüphane tercihe bağlıdır 
fakeredis  # Test ortamında redis kullanmak için

Şimdi pip install -r requirements.txt diyerek bağımlılıklarımızı yükleyebiliriz.

Daha sonra settings.py içerisine gerekli ayarları giriyoruz.

from os import getenv, path, urandom
from dotenv import load_dotenv


class Settings:
    basedir = path.abspath(path.dirname(__file__))
    ...

    # Redis, Redis Queue and RQ Dashboard Settings
    REDIS_URL = getenv("REDIS_URL", "redis://localhost:6379/0")
    RQ_REDIS_URL = getenv("RQ_REDIS_URL", REDIS_URL)
    RQ_DASHBOARD_REDIS_URL = getenv("RQ_DASHBOARD_REDIS_URL", REDIS_URL)
    RQ_ASYNC = getenv("RQ_ASYNC", True)  # Test ortamında "False" girin.
    RQ_CONNECTION_CLASS = "fakeredis.FakeStrictRedis"  # Test ortamı için.


settings = Settings()

10. 11. ve 12. satır Redis, Redis Queue ve RQ Dashboard için connection bilgilerini içeriyor. Hepsini şimdilik aynı girebiliriz. RQ_ASYNC ise asenkron olarak süreci ilerletip ilerletmeyeceğimizi belirliyor. Test ortamında bunu False olarak ayarlayabilirsiniz, böylece redis araya girmeyecek ve testlerde gecikme olmayacaktır. RQ_CONNECTION_CLASS ise yine test ortamında fake redis server başlatarak süreci ilerletecektir. Yani 13. ve 14. satırlar test ortamı için diyebiliriz. Bu kısmı canlıda değiştirmeyi unutmayın.

Son olarak uygulama fabrikamız içerisinde modülleri yapılandırma işlemleri kaldı. app/__init__.py içerisine modülü ekliyoruz.

from flask import Flask, render_template
...
from flask_rq2 import RQ
import rq_dashboard


...
rq2 = RQ()

def create_app():
    app = Flask(__name__)
    app.config.from_object(settings)

    ...
    rq2.init_app(app)
    ...

    app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")

    return app

Hepsi bu kadar, bir terslik olmadıysa uygulamamıza Redis Queue ve RQ Dashboard entegrasyonunu tamamlamış olduk. Şimdi test amacı ile basit bir task hazırlayalım, amacı ortalama almak olan küçük bir program. helpers klasörü içerisine test_helper.py isminde bir dosya oluşturuyoruz ve taski hazırlıyoruz.

from app import rq2

@rq2.job
def average(x, y):
    print("Hey! I am running!")
    return (x + y) / 2

Taski çalıştırmak için terminalden flask shell komutunu vererek Flask arayüzüne erişiyoruz. Daha sonra taski içeri aktarıp bir kaç küçük deneme yapıyoruz.

>>> from app.helpers.test_helper import avarege
>>> avarege(5, 10)
I Am Running
7.5
>>> task = avarege.queue(5, 10)
>>> task
FlaskJob('3bab5e18-e633-4db5-ade1-7f1cdb929db1', enqueued_at=datetime.datetime(2022, 7, 15, 17, 18, 30, 990950))
>>>

Gördüğünüz gibi 2. satırda fonksiyonu normal bir şekilde çalıştırdığımızda bize sonucu hemen döndü. Ancak queue kullanarak kuyruğa eklediğimizde bir task oluşturdu. Atadığımız nesneyi çalıştırınca bize task id ve kuyruğa eklenme zamanını döndü.

Burada “3bab5e18-e633-4db5-ade1-7f1cdb929db1” task id oluyor. İsterseniz terminalde task.id diyerekte ID numarasına erişebiliriz.

task.id  # Task id döner (string)
task.is_canceled  # taskin iptal edilip edilmediğini döner (boolean)
task.is_finished  # taskin bitip bitmediğini döner (boolean)
dir(task)  # daha fazla kullanılabilir nesne listesi döner (list)

Projede /rq dizinine gittiğimizde bu taskin çalışmayı beklediğini görebiliriz.

RQ Dashboard Örneği

Gördüğünüz gibi task sırada bekliyor. Çalışmama sebebi ise basit, daha “worker” dediğimiz işçileri ayağa kaldırmadık. Farklı bir terminal penceresinden flask rq worker diyerek bir worker’ı ayağa kaldırıyoruz.

~$ flask rq worker
20:24:24 Worker rq:worker:4e901140230e417ebd85d13968dc9c0d: started, version 1.10.1
20:24:24 Subscribing to channel rq:pubsub:4e901140230e417ebd85d13968dc9c0d
20:24:24 *** Listening on default...
20:24:24 Cleaning registries for queue: default
20:24:24 default: app.helpers.test_helper.avarege(5, 10) (3bab5e18-e633-4db5-ade1-7f1cdb929db1)
I Am Running
20:24:24 default: Job OK (3bab5e18-e633-4db5-ade1-7f1cdb929db1)
20:24:24 Result is kept for 500 seconds

Gördüğünüz gibi worker ayağa kalkar kalmaz sırada bekleyen işleri tamamladı. Şimdi diğer terminalden taski çağırarak görevin tamamlanıp tamamlanmadığını görebiliriz. Task kontrolünü yapalım.

>>> from app import rq2
>>> task = rq2.get_queue().fetch_job("3bab5e18-e633-4db5-ade1-7f1cdb929db1")
>>> task.is_finished
True
>>> task.result
1.5

Evet, taskimiz görevini gayet güzel bir şekilde yerine getirdi diyebiliriz. Şimdi işin farklı bir yönünü ele alalım.

Elimizde birden fazla task olduğunu varsayalım -ki doğal olarak olacaktır-. Bu tasklari farklı kuyruklar arasında gruplayabilirsiniz. Böylece farklı işçilerin farklı kuyruklarda çalışmasını sağlayabilirsiniz. Bunun için hangi kuyrukta hangi taskin çalışacağını belirtmeniz yeterli olacaktır.

from app import rq2

...

@rq2.job("low", timeout=60)
def add(x, y):
    print("Hey! I am running!")
    return x + y


@rq2.job("high", timeout=30)
def say_hello(name):
    print("Hey! I am running!")
    return f"Hello, {name}!"

Artık farklı worker’lar farklı kuyruklarda çalışabilir. Bunun için job dekoratörüne ilk argüman olarak kuyruk ismini vermeniz yeterli. Ben low ve high isminde iki kuyruğa ayrı ayrı taskler verdim. Ayrıca timeout ise zaman aşım süresini belirtir. Terminalden flask rq worker low komutu verilirse eğer worker low kuyruğunu işlemeye başlayacaklardır. İstenirse task çağrılırken de kuyruk belirtilebilir.

task2 = add.queue(3, 4, queue="high", timeout=60*2)

Gördüğünüz gibi task (kendi dokümanlarında job olarak geçer) oluşturmak işte bu kadar kolay.

İş Zamanlama (Scheduling Jobs)

RQ ile isterseniz eğer zamanlı görevler (cron gibi) oluşturabilirsiniz. Bunun için zaman ayarını yapmanız yeterli olacaktır.

from datetime import datetime, timedelta

from app import rq2

...

average.schedule(timedelta(seconds=60), 1, 2)  # 60 saniyede bir
say_hello.schedule(datetime(2022, 4, 25, 11, 59, 59), "emre")  # UTC

Şimdi zamanlanmış görevleri başlatmak için terminalden flask rq scheduler komutunu verebiliriz. Hepsi bu kadar!

Asenkron E-Posta Gönderimi

RQ projeye nasıl entegre edilir, bunu detaylıca gördük. Şimdi gerçekten işe yarayacak bir geliştirme yapalım. E-Posta gönderim işlemlerini asenkron çalışacak şekilde düzenleyelim. Böylece kaydolan kullanıcılar uygulamanın e-posta göndermesini beklemeden kayıtlarını tamamlayabilecekler. Hemen app/helpers/mail_helper.py dosyasını açıyoruz ve içini aşağıdaki şekilde düzenliyoruz.

from flask_mail import Message

from app import mail
from settings import settings
from app import rq2


class MailHelper:
    def __init__(self):
        self.sender = settings.MAIL_USERNAME

    @rq2.job
    def send_mail(self, title: str, message: str, recipients: list) -> bool:
        msg = Message(title, sender=self.sender, recipients=recipients)
        msg.body = message
        mail.send(msg)

        return True

Burada farklı olarak sadece 5. satırda rq2 nesnesini içeri aktardık ve 12. satırda send_mail fonksiyonunu asenkron hale getirmek için @rq2.job dekoratörünü ekledik. Şimdi kayıt esnasında maili gönderecek düzenlemeyi yapıyoruz. app/routes/auth.py dosyasını aşağıdaki şekilde düzenliyoruz.

from flask import Blueprint, redirect, url_for, render_template, flash
from flask_login import current_user, login_user, logout_user

from app.models.user import User
from app.forms.auth import LoginForm, RegisterForm
from app.helpers.mail_helper import MailHelper

auth = Blueprint("auth", __name__, url_prefix="/auth")

...


@auth.route("/register", methods=["GET", "POST"])
def register():
    if current_user.is_authenticated:
        return redirect(url_for("main.index"))

    form = RegisterForm()

    if form.validate_on_submit():
        user = User()
        user.first_name = form.first_name.data
        user.last_name = form.last_name.data
        user.email = form.email.data
        user.generate_password_hash(form.password.data)
        user.save()

        login_user(user)

        mail_helper = MailHelper()
        mail_helper.send_mail.queue("Kayıt Tamam", "Hoş Geldiniz", [user.email])

        return redirect(url_for("main.index"))

    return render_template("views/auth/register.html", title="Register", form=form)

...

30. ve 31. satırla mail gönderecek kodumuzu asenkron olarak ekledik. Projeyi flask run diyerek ayağa kaldırırken farklı bir terminalden flask rq worker diyerek işçimizide ayağa kaldırıyoruz. Şimdi projeye bir üyelik oluşturalım. Eğer mail ve RQ ayarlarını doğru girdiyseniz, üyelik aşamasında mail kuyruğa alınacak ve mail gönderiminin tamamlanması beklenmeden sayfa oluşturulmuş olacak.

Böylece, Redis Queue entegrasyonunun sonuna gelmiş olduk. Elimden geldikçe net bir şekilde anlatmaya çalıştım. Faydalı olması dileklerim ile.

Tek Yorum

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir