Python - очередь с приоритетом
В статье разбираем практический случай создания очереди с сортировкой по приоритету.
Простая очередь.
Самый просто пример использования очереди - это поэтапная обработка ссылок на скачиваение файлов. Для этого достаточно воспользоваться простейшим списком list куда можно закидывать любой объект c помощью метода .append(). В действительности list не является настоящей очередью (но сейчас это не важно). Получить объект содержащийся в такой “очереди” так же просто, для этого есть метод .pop(). При этом стоит помнить, что метод .pop() этот элемент удаляет навсегда.
queue = [] item1 = "https://www.example.com/001.zip" item2 = "https://www.example.com/002.zip" item3 = "https://www.example.com/003.zip" # Закинем в очередь объекты queue.append(item1) queue.append(item2) queue.append(item3) # Получаем из очереди объекты queue.pop() # Выведет: queue.pop() # "https://www.example.com/003.zip" queue.pop() # "https://www.example.com/002.zip" queue.pop() # "https://www.example.com/001.zip"
Вроде всё просто, но в данном случае простота не значит гибкость. Вот основные проблемы:
- нет автоматической сортировки, элементы из списка можно получить с конца в начало (если вызывать
.pop(0)- с начала в конец); - при применении метода
.pop()к списку не содержащему элементов возникает исключениеIndexErrorкоторое нужно обработать. - нет гарантии что вы работаете однозначно с одним и тем же объектом
queue
Реализация очереди с приоритетом.
Для избежания недостатков простого list предлагаю вооружится встроенным модулем heapq (куча) и паттерном Синглтон.
Почему и зачем использовать модуль heapq? Всё просто, этот модуль может просто и быстро отсортировать объекты по приоритету.
Паттерн Синглтон здесь будет использоваться для гарантированной работы только с одним экземпляром объекта Queue.
Весь код класса Queue приведён ниже:
import heapq
class Queue:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(Queue, cls).__new__(cls)
return cls.instance
def __init__(self):
self.queue = []
heapq.heapify(self.queue)
self.index = 0
def __str__(self):
return f"Last index element in queue: {self.index}."
def __repr__(self):
return f"Queue instance contains {self.queue.__len__()} elements."
def push(self, item, priority=False):
priority_level = 1
if priority:
priority_level = -1
entry = (priority_level, self.index, item)
heapq.heappush(self.queue, entry)
self.index = self.index + 1
def pull(self):
entry = heapq.heappop(self.queue)
return entry[2]
# Использование:
scheduler = Queue()
item1 = "https://www.example.com/001.zip"
item2 = "https://www.example.com/002.zip"
item3 = "https://www.example.com/003.zip"
scheduler.push(item1)
scheduler.push(item2, priority=True)
scheduler.push(item3)
# Вывод:
str(scheduler) # Last index element in queue: 2.
repr(scheduler) # Queue instance contains 3 elements.
scheduler.pull() # "https://www.example.com/002.zip"
scheduler.pull() # "https://www.example.com/001.zip"
scheduler.pull() # "https://www.example.com/003.zip"А теперь подробнее разберём код. Для начала импортируем необходимый модуль heapq :
import heapq
Дальше создадим класс Queue и добавим в него конструктор __new__ и инициализатор __init__:
class Queue:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super().__new__(cls)
return cls.instanceЗдесь я подробнее остановлюсь на конструкторе __new__ (это и будет наша реализация паттерна Синглтон):
Для дальнейшего создания любого количества экземпляров класса Queue которые будут гарантировано ссылаться только на один объект Queue в памяти - можно воспользоваться “волшебным” методом __new__ который инициализируется в самом начале создания экземпляра класса.
- С помощью метода
hasattr()сделаем проверку на отсутствие аттрибутаinstanceу вновь создаваемого экземпляра классаQueue - Если
instanceнет - создадим аттрибутinstanceи присвоим ему результат выполненияsuper().__new__(cls). - Если
instanceв экземпляре есть - просто вернём содержимоеinstance - Магия тут кроется в том что результатом первого вызова
super().__new__(cls)будет создан экземпляр классаQueue, а все последующие попытки создать экземпляры классаQueueвынужденно будут возвращать результат первого вызоваsuper()__new__(cls).
def __init__(self):
self.queue = []
heapq.heapify(self.queue)
self.index = 0В котором мы создадим объект self.queue - очередь с пустым списком list . Для того чтобы превратить его в “кучу” heap просто передадим self.queue в качестве параметра в функцию heapify(). Далее создадим объект с информацией о начальном индексе self.index. По умолчанию индекс = 0.
Следующие методы __str__ и __repr__ имеют практическую ценность в случае отладки нашего кода (но они не обязательны):
def __str__(self):
return f"Last index element in queue: {self.index - 1}."
def __repr__(self):
return f"Queue instance contains {self.queue.__len__()} elements."Метод __str__ будет выводить индекс присвоенный объекту в нашей очереди, который был добавлен последним. Метод __repr__ будет информировать об общем количестве элементов в очереди.
Метод push() отвечает за добавление объекта в очередь:
def push(self, item: Any, priority=False):
priority_level = 1
if priority:
priority_level = -1
entry = (priority_level, self.index, item)
heapq.heappush(self.queue, entry)
self.index = self.index + 1entry это кортеж с тремя элементами: приоритет, индекс, и сам объект. У каждого объекта помещаемого в очередь задается приоритет. Перед каждым помещением объекта в очередь приоритет сбрасывается на 1. Если priority=True тогда объект в очередь помещается с приоритетом = -1. Теперь этот элемент будет в самом начале очереди, а значит наш метод pull() выдаст его первым. Происходит это потому, что heapq сортирует очередь исходя из “веса” элемента, с учётом его расположения. В случае наличия в очереди нескольких элементов с одинаковым приоритетом, сортировка будет осуществлятся по “весу” второго элемента - индекса. После добавления объекта в очередь, увеличиваем индекс на 1 пункт.
Метод pull() отвечает за возврат объекта из очереди, с учётом приоритета:
def pull(self):
entry = heapq.heappop(self.queue)
return entry[2]Метод .heappop() берёт из очереди объект у которого самое нижнее значение приоритета и номер индекса и выдаёт нам только последний (третий) элемент. Не забываем что нумерация идёт от 0.
Реализация очереди поддерживающей работу с потоками.
Код который мы написали выше не подходит для работы с множеством потоков (используя модуль threading). В случае использования потоков, где один пишет сообщение в очередь, а второй забирает это сообщение из очереди и обрабатывает его, может возникнуть блокировка очереди для одного из потоков, которая может привести к возникновению исключения. Для исправления проблемы понадобиться заменить модуль heapq на queue и изменить несколько строк в нашем классе Queue .
- удаляем импорт модуля
heapqи добавляем импорт классаPriorityQueueиз модуляqueue self.queue = []меняем наself.queue = PriorityQueue(), для создание экземпляра классаPriorityQueue- удаляем строчку
heapq.heapify(self.queue) - в методе
__repr__()меняемself.queue.__len__()наself.queue._qsize() heapq.heappush(self.queue, entry)меняем наself.queue.put(entry)- в методе
.pull()строчкуentry = heapq.heappop(self.queue)меняем на
if self.queue._qsize() == 0:
return None
else:
entry = self.queue.get()Пояснение: в случае если количество элементов в очереди будет равно 0 self.queue.get() будет выполнятся вечно, т.к. будет ожидать элемент из очереди. Для предотвращения такого поведения делаем проверку.
Итак, весь код выглядит вот так:
from queue import PriorityQueue
class Queue:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self):
self.queue = PriorityQueue()
self.index = 0
def __str__(self):
return f"Last index element in queue: {self.index}"
def __repr__(self):
return f"Queue instance contains {self.queue._qsize()} elements."
def push(self, item, priority=False):
priority_level = 1
if priority:
priority.level = -1
entry = (priority_level, self.index, item)
self.queue.put(entry)
self.index = self.index + 1
def pull(self):
if self.queue._qsize() == 0:
return None
else:
entry = self.queue.get()
return entry[2]Таким образом наш класс Queue теперь стал поддерживать работу с потоками.
from threading import Thread
scheduler = Queue()
def generate_links(item):
for i in range(item):
scheduler.push(i)
# добавим одну приоритетную
scheduler.push(item+1, priority=True)
def print_links():
n=0
while n is not None:
n=scheduler.pull()
print(f"Item in scheduler = {n}")
first_thread = Thread(target=generate_links, name='Generator', args=(5,))
seconf_thread = Thread(target=print_links, name='Printer')
first_thread.start()
str(scheduler) # Last index element in queue: 5.
repr(scheduler) # Queue instance contains 6 elements.
seconf_thread.start()
# Вывод:
# Item in schedule = 6
# Item in schedule = 0
# Item in schedule = 1
# Item in schedule = 2
# Item in schedule = 3
# Item in schedule = 4
# Item in schedule = None ## None - очередь пуста