BETA, NOT FOR PRODUCTIVE USE!!!
The core functions work. Websocket connections to public endpoints can be established and are stable. (No long-term tests!)
If you would like to take part in the test, please contact us in the chat!
UNICORN Bybit WebSocket API
Description | Installation | How To | Documentation | Examples | Change Log | Wiki | Social | Notifications | Bugs | Contributing | Disclaimer
A Python SDK by LUCIT to use the Bybit Websocket API`s (live+testnet) in a simple, fast, flexible, robust and fully-featured way.
Part of 'UNICORN Trading Suite'.
Get help with the integration of the UNICORN Trading Suite
modules!
Get a UNICORN Trading Suite License
To run modules of the UNICORN Trading Suite you need a valid license!
Receive Data from Bybit WebSockets
Create a multiplex websocket connection to Bybit with a stream_buffer
with just 3 lines of code
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com")
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline.1'], markets=['btcusdt', 'ethusdt'])
And 4 more lines to print out the data
while True:
oldest_data_from_stream_buffer = bybit_wsm.pop_stream_data_from_stream_buffer()
if oldest_data_from_stream_buffer:
print(oldest_data_from_stream_buffer)
Or with a callback function just do
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
def process_new_receives(stream_data):
print(str(stream_data))
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com")
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline.1m'],
markets=['btcusdt', 'ethusdt'],
process_stream_data=process_new_receives)
Or with an async callback function just do
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import asyncio
async def process_new_receives(stream_data):
print(stream_data)
await asyncio.sleep(1)
bybit_wsm = BybitWebSocketApiManager()
bybit_wsm.create_stream(endpoint="public/linear", channels=['kline_1m'],
markets=['btcusdt', 'ethusdt'],
process_stream_data_async=process_new_receives)
Or await the stream data in an asyncio coroutine
All the methods of data collection presented have their own advantages and disadvantages. However, this is the generally recommended method for processing data from streams.
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import asyncio
async def main():
async def process_asyncio_queue(stream_id=None):
print(f"Start processing the data from stream '{bybit_wsm.get_stream_label(stream_id)}':")
while bybit_wsm.is_stop_request(stream_id) is False:
data = await bybit_wsm.get_stream_data_from_asyncio_queue(stream_id)
print(data)
bybit_wsm.asyncio_queue_task_done(stream_id)
bybit_wsm.create_stream(endpoint="public/linear",
channels=['kline.1'],
markets=['btcusdt', 'ethusdt'],
stream_label="KLINE_1m",
process_asyncio_queue=process_asyncio_queue)
while not bybit_wsm.is_manager_stopping():
await asyncio.sleep(1)
with BybitWebSocketApiManager(exchange='bybit.com') as bybit_wsm:
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\r\nGracefully stopping ...")
except Exception as e:
print(f"\r\nERROR: {e}\r\nGracefully stopping ...")
Basically that's it, but there are more options.
Subscribe / unsubscribe new markets and channels
These functions are not ready! (Todo!)
Stop bybit_wsm
after usage to avoid memory leaks
When you instantiate UNICORN Bybit Websocket API with with
, bybit_wsm.stop_manager()
is automatically executed upon exiting the with
-block.
with BybitWebSocketApiManager() as bybit_wsm:
bybit_wsm.create_stream(channels="kline.1", markets="btcusdt", stream_label="KLINE_1m")
Without with
, you must explicitly execute bybit_wsm.stop_manager()
yourself.
bybit_wsm.stop_manager()
stream_signals
- know the state of your streams
Usually you want to know when a stream is working and when it is not. This can be useful to know that your own system is currently "blind" and you may want to close open positions to be on the safe side, know that indicators will now provide incorrect values or that you have to reload the missing data via REST as an alternative.
For this purpose, the UNICORN Bybit WebSocket API provides so-called
stream_signals
,
which are used to tell your code in real time when a stream is connected, when it received its first data record, when
it was disconnected and stopped, and when the stream cannot be restored.
from unicorn_bybit_websocket_api import BybitWebSocketApiManager
import time
def process_stream_signals(signal_type=None, stream_id=None, data_record=None, error_msg=None):
print(f"Received stream_signal for stream '{bybit_wsm.get_stream_label(stream_id=stream_id)}': "
f"{signal_type} - {stream_id} - {data_record} - {error_msg}")
with BybitWebSocketApiManager(process_stream_signals=process_stream_signals) as bybit_wsm:
bybit_wsm.create_stream(channels="trade", markets="btcusdt", stream_label="TRADES")
time.sleep(2)
print(f"Waiting 5 seconds and then stop the stream ...")
time.sleep(5)
Description
The Python package UNICORN Bybit WebSocket API provides an API to the Bybit Websocket API`s of Bybit (+Testnet).
What are the benefits of the UNICORN Bybit WebSocket API?
-
Fully managed websockets and 100% auto-reconnect! Also handles maintenance windows!
-
No memory leaks from Python version 3.7 to 3.12!
-
The full UTS stack is delivered as a compiled C extension for maximum performance.
Exchange | Exchange string | WS | WS API |
---|---|---|---|
Bybit | bybit.com |
||
Bybit Testnet | bybit.com-testnet |
-
Streams are processing asynchronous/concurrent (Python asyncio) and each stream is started in a separate thread, so you don't need to deal with asyncio in your code! But you can consume with
await
, if you want! -
Supports subscribe/unsubscribe on all exchanges! (Take a look to the max supported subscriptions per stream in the endpoint configuration overview!)
-
UNICORN Bybit WebSocket API respects Bybit's API guidelines and protects you from avoidable reconnects and bans.
-
Support for multiple private
!userData
streams with differentapi_key
andapi_secret
. (example_multiple_userdata_streams.py) -
Pick up the received data from the
stream_buffer
(FIFO or LIFO) - if you can not store your data in cause of a temporary technical issue, you can kick back the data to thestream_buffer
which stores the receives in the RAM till you are able to process the data in the normal way again. Learn more! -
Use separate
stream_buffers
for specific streams or users! -
Watch the
stream_signals
to receiveCONNECT
,FIRST_RECEIVED_DATA
,DISCONNECT
,STOP
andSTREAM_UNREPAIRABLE
signals from your streams! Learn more! -
Get the received data unchanged as received, as Python dictionary or converted with UnicornFy into well-formed Python dictionaries. Use the
output
parameter ofcreate_stream()
to control the output format. -
Helpful management features like
clear_asyncio_queue()
,clear_stream_buffer()
,get_bybit_api_status()
,get_current_receiving_speed()
,get_errors_from_endpoints()
,get_limit_of_subscriptions_per_stream()
,get_request_id()
,get_result_by_request_id()
,get_results_from_endpoints()
,get_stream_buffer_length()
,get_stream_info()
,get_stream_list()
,get_stream_id_by_label()
,get_stream_statistic()
,get_stream_subscriptions()
,get_version()
,is_update_available()
,get_stream_data_from_asyncio_queue()
,pop_stream_data_from_stream_buffer()
,print_summary()
,replace_stream()
,set_stream_label()
,set_ringbuffer_error_max_size()
,subscribe_to_stream()
,stop_stream()
,unsubscribe_from_stream()
,wait_till_stream_has_started()
and many more! Explore them here. -
Monitor the status of the created
BybitWebSocketApiManager()
instance within your code withget_monitoring_status_plain()
and specific streams withget_stream_info()
. -
Available as a package via
pip
andconda
as precompiled C extension with stub files for improved Intellisense functions and source code for easier debugging of the source code. To the installation. -
Integration of test cases and examples.
-
Customizable base URL.
-
Socks5 Proxy support:
bybit_wsm = BybitWebSocketApiManager(exchange="bybit.com", socks5_proxy_server="127.0.0.1:9050")
Read the docs or this how to for more information or try example_socks5_proxy.py.
- Excessively tested on Linux, Mac and Windows on x86, arm32, arm64, ...
If you like the project, please it on GitHub!
Installation and Upgrade
The module requires Python 3.7 and runs smoothly up to and including Python 3.12.
Anaconda packages are available from Python version 3.8 and higher, but only in the latest version!
For the PyPy interpreter we offer packages via PyPi only from Python version 3.9 and higher.
The current dependencies are listed here.
If you run into errors during the installation take a look here.
Packages are created automatically with GitHub Actions
When a new release is to be created, we start two GitHubActions:
Both start virtual Windows/Linux/Mac servers provided by GitHub in the cloud with preconfigured environments and create the respective compilations and stub files, pack them into wheels and conda packages and then publish them on GitHub, PYPI and Anaconda. This is a transparent method that makes it possible to trace the source code behind a compilation.
A Cython binary, PyPy or source code based CPython wheel of the latest version with pip
from PyPI
Our Cython and PyPy Wheels are available on PyPI, these wheels offer significant advantages for Python developers:
-
Performance Boost with Cython Wheels: Cython is a programming language that supplements Python with static typing and C-level performance. By compiling Python code into C, Cython Wheels can significantly enhance the execution speed of Python code, especially in computationally intensive tasks. This means faster runtimes and more efficient processing for users of our package.
-
PyPy Wheels for Enhanced Efficiency: PyPy is an alternative Python interpreter known for its speed and efficiency. It uses Just-In-Time (JIT) compilation, which can dramatically improve the performance of Python code. Our PyPy Wheels are tailored for compatibility with PyPy, allowing users to leverage this speed advantage seamlessly.
Both Cython and PyPy Wheels on PyPI make the installation process simpler and more straightforward. They ensure that you get the optimized version of our package with minimal setup, allowing you to focus on development rather than configuration.
On Raspberry Pi and other architectures for which there are no pre-compiled versions, the package can still be installed with PIP. PIP then compiles the package locally on the target system during installation. Please be patient, this may take some time!
Installation
pip install unicorn-bybit-websocket-api
Update
pip install unicorn-bybit-websocket-api --upgrade
A Conda Package of the latest version with conda
from Anaconda
The unicorn-bybit-websocket-api
package is also available as a Cython version for the linux-64
, osx-64
and win-64
architectures with Conda through the
lucit
channel.
For optimal compatibility and performance, it is recommended to source the necessary dependencies from the
conda-forge
channel.
Installation
conda config --add channels conda-forge
conda config --add channels lucit
conda install -c lucit unicorn-bybit-websocket-api
Update
conda update -c lucit unicorn-bybit-websocket-api
From source of the latest release with PIP from GitHub
Linux, macOS, ...
Run in bash:
pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/archive/$(curl -s https://api.github.com/repos/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/releases/latest | grep -oP '"tag_name": "\K(.*)(?=")').tar.gz --upgrade
Windows
Use the below command with the version (such as 0.1.0) you determined here:
pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/archive/0.1.0.tar.gz --upgrade
From the latest source (dev-stage) with PIP from GitHub
This is not a release version and can not be considered to be stable!
pip install https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/tarball/master --upgrade
Change Log
https://unicorn-bybit-websocket-api.docs.lucit.tech/changelog.html
Documentation
Examples
Howto
Project Homepage
https://www.lucit.tech/unicorn-bybit-websocket-api.html
Wiki
https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/wiki
Social
- Discussions
- Gitter
- https://t.me/unicorndevs
- Telegram - English API Community
- Telegram - Chinese API Community
- Discord
Receive Notifications
To receive notifications on available updates you can
the repository on GitHub, write your
own script
with using
is_update_available()
or you use the
monitoring API service.
Follow us on LinkedIn, X or Facebook!
To receive news (like inspection windows/maintenance) about the Bybit API`s subscribe to their telegram groups:
How to report Bugs or suggest Improvements?
List of planned features - click if you need one of them or suggest a new feature!
Before you report a bug, try the latest release. If the issue still exists, provide the error trace, OS and Python version and explain how to reproduce the error. A demo script is appreciated.
If you don't find an issue related to your topic, please open a new issue!
Contributing
UNICORN Bybit WebSocket API is an open source project which welcomes contributions which can be anything from simple documentation fixes and reporting dead links to new features. To contribute follow this guide.
Contributors
We open source!
Disclaimer
This project is for informational purposes only. You should not construe this information or any other material as legal, tax, investment, financial or other advice. Nothing contained herein constitutes a solicitation, recommendation, endorsement or offer by us or any third party provider to buy or sell any securities or other financial instruments in this or any other jurisdiction in which such solicitation or offer would be unlawful under the securities laws of such jurisdiction.
If you intend to use real money, use it at your own risk!
Under no circumstances will we be responsible or liable for any claims, damages, losses, expenses, costs or liabilities of any kind, including but not limited to direct or indirect damages for loss of profits.
Do you have any questions? We will be happy to answer them in our chat - usually you will receive an answer within a few minutes during our opening hours.