关注我的公众号YueTan进行交流探讨
欢迎关注我的APS仓库:https://github.com/yuetan1988/python-lekin

APS系列入门

本次解读的代码来自:

https://github.com/samy-barrech/Flexible-Job-Shop-Scheduling-Problem/tree/master/app

关于生产排程的问题。

点评

  • 方法相对全面
  • 没有体现出图工艺路线,按照线性做下去

数据

6   6   1   
6   1   3   1   1   1   3   1   2   6   1   4   7   1   6   3   1   5   6   
6   1   2   8   1   3   5   1   5   10  1   6   10  1   1   10  1   4   4   
6   1   3   5   1   4   4   1   6   8   1   1   9   1   2   1   1   5   7   
6   1   2   5   1   1   5   1   3   5   1   4   3   1   5   8   1   6   9   
6   1   3   9   1   2   3   1   5   5   1   6   4   1   1   3   1   4   1   
6   1   2   3   1   4   3   1   6   9   1   1   10  1   5   4   1   3   1   

第一行:代表6个job,6个机器,每个工序平均需要1台机器
之后的6行代表其中的6job,第二行是第一个job,需要6个工序来完成,第一个工序可以1台机器完成,机器3,需要1个单位时间。

OOD

注意从scheduler里记录已完成job, 待完成job; job里记录已完成actitity, 未完成activity; activity里记录已完成op, 待完成op

  • job
    • 待完成activity
    • 已完成activity
  • activity
    • 属于的job
    • 待完成operation
    • 已完成operation
    • 一个工序可以在多个机器完成,时间不一样的话。activity里会同时有多个op
  • operation
    • operation用那个机器
    • 花的时间
  • machine
    • operation
    • 增加operation, 最大operation
    • 处理的operation
  • 排产
    • 待完成job
    • 已完成job

规则

1 parse => jobs_list, machines_list

2 heuristic = Heuristic().select

3 Scheduler(machine_list, jobs_list)

4 Scheduler.run(heuristic)

def run(self, heuristic, verbose=True):
		# Disable print if verbose is False
		if not verbose:
			sys.stdout = None

		current_step = 0

		while len(self.__jobs_to_be_done) > 0:  # 只要有未完成工作
			current_step += 1

			best_candidates = heuristic(self.__jobs_to_be_done, self.__max_operations, current_step)  # 不同规则返回最佳candidate,一次完成一个工作
			for id_machine, candidates in best_candidates.items():  # 最佳候选里,是关于多个机器,其能承受最大工序的
				machine = self.__machines[id_machine - 1]  # id转化为对象
				for activity, operation in candidates:
					if not (machine.is_working_at_max_capacity() or activity.is_pending):  # 符合条件的分配给机器
						machine.add_operation(activity, operation)

			for machine in self.__machines:  # 机器时间开始流动
				machine.work()

			for job in self.__jobs_to_be_done:  # 检查更新job是否完整状态,更新未完成的job
				if job.is_done:
					self.__jobs_to_be_done = list(
						filter(lambda element: element.id_job != job.id_job, self.__jobs_to_be_done))
					self.__jobs_done.append(job)

		print(colored("[SCHEDULER]", "green"), "Done in " + str(current_step) + " units of time")

		# Reenable stdout
		if not verbose:
			sys.stdout = self.__original_stdout

		return current_step

对于每个job的排产位于heuristic函数中进行

class Heuristics:
	# When a choice between multiple operations is available, always pick the first one
	@staticmethod
	def select_first_operation(jobs_to_be_done, max_operations, _):
		best_candidates = {}

		for job in jobs_to_be_done:  # 从待完成的job里找到best_candidate
			current_activity = job.current_activity  # 向下选择activity/ operation,由于activity要按顺序完成,选择每个job当前
			best_operation = current_activity.shortest_operation  # 当前里最短的工序

			if best_candidates.get(best_operation.id_machine) is None:  # 如果最佳候选里暂时没有这台机器的安排
				best_candidates.update({best_operation.id_machine: [(current_activity, best_operation)]})
			elif len(best_candidates.get(best_operation.id_machine)) < max_operations:  # 如果最佳候选还能继续安排工作,加上去
				best_candidates.get(best_operation.id_machine).append((current_activity, best_operation))
			else:  # 机器已经满载了,那么就从里面选
				list_operations = best_candidates.get(best_operation.id_machine)  

				for key, (_, operation) in enumerate(list_operations):
					if operation.duration < best_operation.duration:  # 原本候选集里比最新这个时间更短的,pop一个出去
						list_operations.pop(key)
						break

				if len(list_operations) < max_operations:  # 如果有pop,就把最新的加上去。否则其实还是原本的候选
					list_operations.append((current_activity, best_operation))

		return best_candidates

时间流动

亲测

请添加图片描述

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐