May 16, 2022

Python - очередь с приоритетом

В статье разбираем практический случай создания очереди с сортировкой по приоритету.

Простая очередь.

Самый просто пример использования очереди - это поэтапная обработка ссылок на скачиваение файлов. Для этого достаточно воспользоваться простейшим списком list куда можно закидывать любой объект c помощью метода .append(). В действительности list не является настоящей очередью (но сейчас это не важно). Получить объект содержащийся в такой “очереди” так же просто, для этого есть метод .pop(). При этом стоит помнить, что метод .pop() этот элемент удаляет навсегда.

Пример кода:

queue = []

item1 = "https://www.example.com/001.zip"
item2 = "https://www.example.com/002.zip"
item3 = "https://www.example.com/003.zip"

# Закинем в очередь объекты
queue.append(item1)
queue.append(item2)
queue.append(item3)

# Получаем из очереди объекты
queue.pop()
# Выведет:
queue.pop() # "https://www.example.com/003.zip"
queue.pop() # "https://www.example.com/002.zip"
queue.pop() # "https://www.example.com/001.zip"

Вроде всё просто, но в данном случае простота не значит гибкость. Вот основные проблемы:

  1. нет автоматической сортировки, элементы из списка можно получить с конца в начало (если вызывать .pop(0) - с начала в конец);
  2. при применении метода .pop() к списку не содержащему элементов возникает исключение IndexError которое нужно обработать.
  3. нет гарантии что вы работаете однозначно с одним и тем же объектом queue

Реализация очереди с приоритетом.

Для избежания недостатков простого list предлагаю вооружится встроенным модулем heapq (куча) и паттерном Синглтон.

Почему и зачем использовать модуль heapq? Всё просто, этот модуль может просто и быстро отсортировать объекты по приоритету.

Паттерн Синглтон здесь будет использоваться для гарантированной работы только с одним экземпляром объекта Queue.

Весь код класса Queue приведён ниже:

import heapq


class Queue:
    def __new__(cls):
        if not hasattr(cls, 'instance'):
            cls.instance = super(Queue, cls).__new__(cls)
        return cls.instance
        
    def __init__(self):
        self.queue = []
        heapq.heapify(self.queue)
        self.index = 0
    
    def __str__(self):
        return f"Last index element in queue: {self.index}."
    
    def __repr__(self):
        return f"Queue instance contains {self.queue.__len__()} elements."
        
    def push(self, item, priority=False):
        priority_level = 1
        if priority:
            priority_level = -1
        entry = (priority_level, self.index, item)
        heapq.heappush(self.queue, entry)
        self.index = self.index + 1
        
    def pull(self):
        entry = heapq.heappop(self.queue)
        return entry[2]
   
     
# Использование:

scheduler = Queue()

item1 = "https://www.example.com/001.zip"
item2 = "https://www.example.com/002.zip"
item3 = "https://www.example.com/003.zip"

scheduler.push(item1)
scheduler.push(item2, priority=True)
scheduler.push(item3) 

# Вывод:
str(scheduler)   # Last index element in queue: 2.
repr(scheduler)  # Queue instance contains 3 elements.
scheduler.pull() # "https://www.example.com/002.zip"
scheduler.pull() # "https://www.example.com/001.zip"
scheduler.pull() # "https://www.example.com/003.zip"

А теперь подробнее разберём код. Для начала импортируем необходимый модуль heapq :

import heapq

Дальше создадим класс Queue и добавим в него конструктор __new__ и инициализатор __init__:

class Queue:
    def __new__(cls):
        if not hasattr(cls, 'instance'):
            cls.instance = super().__new__(cls)
        return cls.instance

Здесь я подробнее остановлюсь на конструкторе __new__ (это и будет наша реализация паттерна Синглтон):

Для дальнейшего создания любого количества экземпляров класса Queue которые будут гарантировано ссылаться только на один объект Queue в памяти - можно воспользоваться “волшебным” методом __new__ который инициализируется в самом начале создания экземпляра класса.

Итак, следите за руками:

  1. С помощью метода hasattr() сделаем проверку на отсутствие аттрибута instance у вновь создаваемого экземпляра класса Queue
  2. Если instance нет - создадим аттрибут instance и присвоим ему результат выполнения super().__new__(cls) .
  3. Если instance в экземпляре есть - просто вернём содержимое instance
  4. Магия тут кроется в том что результатом первого вызова super().__new__(cls) будет создан экземпляр класса Queue , а все последующие попытки создать экземпляры класса Queue вынужденно будут возвращать результат первого вызова super()__new__(cls) .

Дальше идёт метод __init__ :

def __init__(self):
    self.queue = []
    heapq.heapify(self.queue)
    self.index = 0

В котором мы создадим объект self.queue - очередь с пустым списком list . Для того чтобы превратить его в “кучу” heap просто передадим self.queue в качестве параметра в функцию heapify(). Далее создадим объект с информацией о начальном индексе self.index. По умолчанию индекс = 0.

Следующие методы __str__ и __repr__ имеют практическую ценность в случае отладки нашего кода (но они не обязательны):

def __str__(self):
    return f"Last index element in queue: {self.index - 1}."
    
def __repr__(self):
    return f"Queue instance contains {self.queue.__len__()} elements."

Метод __str__ будет выводить индекс присвоенный объекту в нашей очереди, который был добавлен последним. Метод __repr__ будет информировать об общем количестве элементов в очереди.

Метод push() отвечает за добавление объекта в очередь:

def push(self, item: Any, priority=False):
    priority_level = 1
    if priority:
        priority_level = -1
    entry = (priority_level, self.index, item)
    heapq.heappush(self.queue, entry)
    self.index = self.index + 1

entry это кортеж с тремя элементами: приоритет, индекс, и сам объект. У каждого объекта помещаемого в очередь задается приоритет. Перед каждым помещением объекта в очередь приоритет сбрасывается на 1. Если priority=True тогда объект в очередь помещается с приоритетом = -1. Теперь этот элемент будет в самом начале очереди, а значит наш метод pull() выдаст его первым. Происходит это потому, что heapq сортирует очередь исходя из “веса” элемента, с учётом его расположения. В случае наличия в очереди нескольких элементов с одинаковым приоритетом, сортировка будет осуществлятся по “весу” второго элемента - индекса. После добавления объекта в очередь, увеличиваем индекс на 1 пункт.

Метод pull() отвечает за возврат объекта из очереди, с учётом приоритета:

def pull(self):
    entry = heapq.heappop(self.queue)
    return entry[2]

Метод .heappop() берёт из очереди объект у которого самое нижнее значение приоритета и номер индекса и выдаёт нам только последний (третий) элемент. Не забываем что нумерация идёт от 0.

Реализация очереди поддерживающей работу с потоками.

Код который мы написали выше не подходит для работы с множеством потоков (используя модуль threading). В случае использования потоков, где один пишет сообщение в очередь, а второй забирает это сообщение из очереди и обрабатывает его, может возникнуть блокировка очереди для одного из потоков, которая может привести к возникновению исключения. Для исправления проблемы понадобиться заменить модуль heapq на queue и изменить несколько строк в нашем классе Queue .

Вот наши изменения:

  1. удаляем импорт модуля heapq и добавляем импорт класса PriorityQueue из модуля queue
  2. self.queue = [] меняем на self.queue = PriorityQueue(), для создание экземпляра класса PriorityQueue
  3. удаляем строчку heapq.heapify(self.queue)
  4. в методе __repr__() меняем self.queue.__len__() на self.queue._qsize()
  5. heapq.heappush(self.queue, entry) меняем на self.queue.put(entry)
  6. в методе .pull() строчку entry = heapq.heappop(self.queue) меняем на
if self.queue._qsize() == 0:
    return None
else:
    entry = self.queue.get()

Пояснение: в случае если количество элементов в очереди будет равно 0 self.queue.get() будет выполнятся вечно, т.к. будет ожидать элемент из очереди. Для предотвращения такого поведения делаем проверку.

Итак, весь код выглядит вот так:

from queue import PriorityQueue


class Queue:
    def __new__(cls):
        if not hasattr(cls, 'instance'):
            cls.instance = super().__new__(cls)
        return cls.instance
    
    def __init__(self):
        self.queue = PriorityQueue()
        self.index = 0
    
    def __str__(self):
        return f"Last index element in queue: {self.index}"
    
    def __repr__(self):
        return f"Queue instance contains {self.queue._qsize()} elements."
        
    def push(self, item, priority=False):
        priority_level = 1
        if priority:
            priority.level = -1
        entry = (priority_level, self.index, item)
        self.queue.put(entry)
        self.index = self.index + 1
        
    def pull(self):
        if self.queue._qsize() == 0:
            return None
        else:
            entry = self.queue.get()
        return entry[2]

Таким образом наш класс Queue теперь стал поддерживать работу с потоками.

Пример использования:

from threading import Thread


scheduler = Queue()


def generate_links(item):
    for i in range(item):
        scheduler.push(i)
    # добавим одну приоритетную
    scheduler.push(item+1, priority=True)
    
    
def print_links():
    n=0
    while n is not None:
        n=scheduler.pull()
        print(f"Item in scheduler = {n}")
        
        
first_thread = Thread(target=generate_links, name='Generator', args=(5,))
seconf_thread = Thread(target=print_links, name='Printer')

first_thread.start()
str(scheduler)   # Last index element in queue: 5.
repr(scheduler)  # Queue instance contains 6 elements.
seconf_thread.start() 

# Вывод:
# Item in schedule = 6
# Item in schedule = 0
# Item in schedule = 1
# Item in schedule = 2
# Item in schedule = 3
# Item in schedule = 4
# Item in schedule = None ## None - очередь пуста