From eb5f5e2d0a96c10cbb92c8cf069e75e2783591d4 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sun, 17 Nov 2024 12:41:37 +0100 Subject: [PATCH] Add taskiq --- poetry.lock | 150 ++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 2 + src/core/broker.py | 20 ++++++ src/core/config.py | 2 + 4 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 src/core/broker.py diff --git a/poetry.lock b/poetry.lock index 68baa38..f9200be 100644 --- a/poetry.lock +++ b/poetry.lock @@ -152,6 +152,17 @@ doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphin test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] trio = ["trio (>=0.23)"] +[[package]] +name = "async-timeout" +version = "5.0.1" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, + {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, +] + [[package]] name = "attrs" version = "24.2.0" @@ -413,6 +424,29 @@ files = [ {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"}, ] +[[package]] +name = "importlib-metadata" +version = "8.5.0" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "importlib_metadata-8.5.0-py3-none-any.whl", hash = "sha256:45e54197d28b7a7f1559e60b95e7c567032b602131fbd588f1497f47880aa68b"}, + {file = "importlib_metadata-8.5.0.tar.gz", hash = "sha256:71522656f0abace1d072b9e5481a48f07c138e00f079c38c8f883823f9c26bd7"}, +] + +[package.dependencies] +zipp = ">=3.20" + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +perf = ["ipython"] +test = ["flufl.flake8", "importlib-resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +type = ["pytest-mypy"] + [[package]] name = "mongojet" version = "0.2.4" @@ -586,6 +620,27 @@ files = [ {file = "multidict-6.0.5.tar.gz", hash = "sha256:f7e301075edaf50500f0b341543c41194d8df3ae5caf4702f2095f3ca73dd8da"}, ] +[[package]] +name = "packaging" +version = "24.2" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, + {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, +] + +[[package]] +name = "pycron" +version = "3.0.0" +description = "Simple cron-like parser, which determines if current datetime matches conditions." +optional = false +python-versions = ">=3.5" +files = [ + {file = "pycron-3.0.0.tar.gz", hash = "sha256:b916044e3e8253d5409c68df3ac64a3472c4e608dab92f40e8f595e5d3acb3de"}, +] + [[package]] name = "pydantic" version = "2.9.2" @@ -864,6 +919,24 @@ files = [ {file = "pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a"}, ] +[[package]] +name = "redis" +version = "5.2.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.8" +files = [ + {file = "redis-5.2.0-py3-none-any.whl", hash = "sha256:ae174f2bb3b1bf2b09d54bf3e51fbc1469cf6c10aa03e21141f51969801a7897"}, + {file = "redis-5.2.0.tar.gz", hash = "sha256:0b1087665a771b1ff2e003aa5bdd354f15a70c9e25d5a7dbf9c722c16528a7b0"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} + +[package.extras] +hiredis = ["hiredis (>=3.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)"] + [[package]] name = "six" version = "1.16.0" @@ -886,6 +959,62 @@ files = [ {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, ] +[[package]] +name = "taskiq" +version = "0.11.7" +description = "Distributed task queue with full async support" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq-0.11.7-py3-none-any.whl", hash = "sha256:15f741ca03e812724985333a327a2344ab720e7d54daaa932e1d3df7639558da"}, + {file = "taskiq-0.11.7.tar.gz", hash = "sha256:dcb43960de0309b10bda814ce4da3963e532d50c132687a43edffa3f60da440a"}, +] + +[package.dependencies] +anyio = ">=3" +importlib-metadata = "*" +packaging = ">=19" +pycron = ">=3.0.0,<4.0.0" +pydantic = ">=1.0,<=3.0" +pytz = "*" +taskiq_dependencies = ">=1.3.1,<2" +typing-extensions = ">=3.10.0.0" + +[package.extras] +cbor = ["cbor2 (>=5.4.6,<6.0.0)"] +metrics = ["prometheus_client (>=0,<1)"] +msgpack = ["msgpack (>=1.0.7,<2.0.0)"] +orjson = ["orjson (>=3.9.9,<4.0.0)"] +reload = ["gitignore-parser (>=0,<1)", "watchdog (>=2.1.9,<3.0.0)"] +uv = ["uvloop (>=0.16.0,<1)"] +zmq = ["pyzmq (>=23.2.0,<24.0.0)"] + +[[package]] +name = "taskiq-dependencies" +version = "1.5.4" +description = "FastAPI like dependency injection implementation" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq_dependencies-1.5.4-py3-none-any.whl", hash = "sha256:8b10d2635a8ada8774f1b555e0a6d72c4fb5e6089601858d38dd95ff6d214a4c"}, + {file = "taskiq_dependencies-1.5.4.tar.gz", hash = "sha256:04546a5786e0f8cb2e008af19cdf415dd27d63d6d29ccf15eda8fa6b8b6b8006"}, +] + +[[package]] +name = "taskiq-redis" +version = "1.0.2" +description = "Redis integration for taskiq" +optional = false +python-versions = "<4.0.0,>=3.8.1" +files = [ + {file = "taskiq_redis-1.0.2-py3-none-any.whl", hash = "sha256:d24c4ba34560eb882af351ec34eab4fc90ff6b82ef9b3245d0a499d335b92086"}, + {file = "taskiq_redis-1.0.2.tar.gz", hash = "sha256:9a8e8b8e26847e25fbe6e6f8f910632a66f3d38bb5249699193efcfd1e84e25f"}, +] + +[package.dependencies] +redis = ">=5,<6" +taskiq = ">=0.11.1,<1" + [[package]] name = "twitchapi" version = "4.3.1" @@ -1028,7 +1157,26 @@ files = [ idna = ">=2.0" multidict = ">=4.0" +[[package]] +name = "zipp" +version = "3.21.0" +description = "Backport of pathlib-compatible object wrapper for zip files" +optional = false +python-versions = ">=3.9" +files = [ + {file = "zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931"}, + {file = "zipp-3.21.0.tar.gz", hash = "sha256:2c9958f6430a2040341a52eb608ed6dd93ef4392e02ffe219417c1b28b5dd1f4"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +type = ["pytest-mypy"] + [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "bc98a45353566e77b38104fd1dfd6ece7fa5f48823e9ca44181e96ea870d09de" +content-hash = "3724cbb8df43cb95838fe9710c7baf48ba667b605a01ff81054929b7fca1bf9f" diff --git a/pyproject.toml b/pyproject.toml index 7329660..480c046 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,8 @@ httpx = "^0.27.2" icalendar = "^6.0.1" pytz = "^2024.2" mongojet = "^0.2.4" +taskiq = "^0.11.7" +taskiq-redis = "^1.0.2" [build-system] diff --git a/src/core/broker.py b/src/core/broker.py new file mode 100644 index 0000000..9d479f3 --- /dev/null +++ b/src/core/broker.py @@ -0,0 +1,20 @@ +from taskiq import TaskiqScheduler +from taskiq.schedule_sources import LabelScheduleSource +from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend + +from core.config import config + + +broker = ListQueueBroker( + url=config.REDIS_URI, + result_backend=RedisAsyncResultBackend( + redis_url=config.REDIS_URI, + result_ex_time=60 * 60 * 24 * 7, + ), +) + + +scheduler = TaskiqScheduler( + broker=broker, + sources=[LabelScheduleSource(broker)], +) diff --git a/src/core/config.py b/src/core/config.py index 9b2559e..1a92e20 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -18,6 +18,8 @@ class Config(BaseSettings): MONGODB_URI: str + REDIS_URI: str + SECRETS_FILE_PATH: str