Flask app with sqlalchemy trying to schedule a job with Flask-APScheduler
·
Answer a question
It works well when executing the flask app, but we get the following error when executing in Flask-APScheduler context: AttributeError: Neither 'InstrumentedAttribute' object nor 'Comparator' object associated with SMSMessage.logs has an attribute 'count'
We sub-classed the APScheduler class because we felt we could force it to use our flask context.
My model is defined as:
class SMSMessage(BaseModel):
__tablename__ = 'sms_message'
logs = relationship("SMSLog", backref='sms', lazy='dynamic')
@hybrid_property
def last_status(self):
if self.logs.count() > 0:
last_status = self.logs.order_by(SMSLog.date_created.desc()).first()
return last_status.at_status_code
return "NA"
The job function is defined in main function file as:
import os
import dateutil.parser
from flask_apscheduler import APScheduler as _BaseAPScheduler
from app import settings
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Blueprint, Flask
from datetime import datetime
from app.database import db
from app.database.models import SMSMessage, SMSLog
class APScheduler(_BaseAPScheduler):
def run_job(self, id, jobstore=None):
with self.app.app_context():
super().run_job(id=id, jobstore=jobstore)
application = Flask(__name__)
def run_queued_messages():
with application.app_context():
sms_messages = SMSMessage.query.filter(
SMSMessage.last_status == 'Queued',
SMSMessage.send_at <= datetime.now()
).all()
for sms_message in sms_messages:
print('do something')
def configure_app(flask_app):
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_DATABASE = os.getenv("DB_DATABASE")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
flask_app.config['SQLALCHEMY_DATABASE_URI'] = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}'
flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = settings.SQLALCHEMY_TRACK_MODIFICATIONS
flask_app.config['SCHEDULER_JOBSTORES'] = {
'default': SQLAlchemyJobStore(
url=flask_app.config['SQLALCHEMY_DATABASE_URI'],
tableschema=os.getenv('SCHEMA_NAME')
),
}
# do not allow for api access job management
flask_app.config['SCHEDULER_API_ENABLED'] = False
flask_app.config['JOBS'] = [
{
'id': None,
'func': run_queued_messages,
'trigger': 'interval',
'seconds': 1,
'replace_existing': True
}
]
flask_app.config['SCHEDULER_EXECUTORS'] = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
flask_app.config['SCHEDULER_JOB_DEFAULTS'] = {
'coalesce': False,
'max_instances': 3
}
def initialize_app(flask_app):
configure_app(flask_app)
db.init_app(flask_app)
initialize_app(application)
db.application = application
scheduler = APScheduler()
scheduler.init_app(application)
scheduler.start()
Answers
self.logsis not a collection of logs, but a reference toLogsas a relationship. You actually need to fetch the count first;- It's bad practice to
count()when you actually just need to know if any logexists(). If we were cooking dinner and I asked if there was still any rice, I wouldn't expect you to count the individual grains of rice, but to just check if the pot was empty; - If no match exists,
.first()returnsNone. Use it to your advantage:
@hybrid_property
def last_status(self):
last_status = SMSLog.query\
.filter(SMSLog.message_id == self.id)\
.order_by(SMSLog.date_created.desc())\
.first()
return last_status.at_status_code if last_status is not None else "NA"
更多推荐

所有评论(0)