Dynastie/dynastie/models.py
2022-10-08 16:08:02 +02:00

510 lines
18 KiB
Python
Executable File

# -*- coding: utf-8 -*-
"""
Copyright 2012-2014 Grégory Soutadé
This file is part of Dynastie.
Dynastie is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Dynastie is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Dynastie. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import shutil
import hashlib
import inspect
import gzip
from unicodedata import normalize
from re import sub
from datetime import datetime
from django.db import models
from django.contrib.auth.models import User
from django.db.models.signals import pre_init, post_init, pre_delete, post_delete
from django.db.models.signals import pre_save, post_save
from django.dispatch import receiver
from dynastie.generators import *
def slugify(name):
name = name.strip()
name = normalize('NFKD', name).replace(' ', '-').lower()
#remove `other` characters
name = sub('[^a-zA-Z0-9_-]', '', name)
#nomalize dashes
name = sub('-+', '-', name)
return name
class Language(models.Model):
name = models.CharField(max_length=255, unique=True)
abbrev = models.CharField(max_length=4, unique=True)
class Blog(models.Model):
name = models.CharField(max_length=255, unique=True)
title = models.CharField(max_length=255)
description = models.TextField(max_length=255, blank=True)
keywords = models.TextField(blank=True)
writers = models.ManyToManyField(User)
engines = list()
src_path = ''
output_path = ''
report = ''
def create_paths(self):
self.src_path = os.environ['DYNASTIE_ROOT'] + 'sites/' + self.name
self.output_path = os.environ['DYNASTIE_ROOT'] + 'sites/' + self.name + '_output'
def create(self):
self.create_paths()
if not os.path.exists(os.environ['DYNASTIE_ROOT'] + 'sites'):
os.mkdir(os.environ['DYNASTIE_ROOT'] + 'sites')
if not os.path.exists(self.src_path):
os.mkdir(self.src_path)
if not os.path.exists(self.output_path):
os.mkdir(self.output_path)
def remove(self):
if os.path.exists(self.src_path):
shutil.rmtree(self.src_path)
if os.path.exists(self.output_path):
shutil.rmtree(self.output_path)
def load_generators(self):
if os.path.exists(self.src_path + '/_generators'):
f = open(self.src_path + '/_generators', 'r')
for line in f:
if line.startswith("#"):
continue
engine = line.strip()
if not engine in globals():
print('Engine ' + engine + ' doesn\'t exists')
else:
self.engines.append(globals()[engine])
f.close()
else:
self.engines.append(globals()['post'])
self.engines.append(globals()['index'])
self.engines.append(globals()['category'])
self.engines.append(globals()['tag'])
self.engines.append(globals()['archive'])
self.engines.append(globals()['atom'])
self.engines.append(globals()['rss'])
self.engines.append(globals()['all_posts'])
def get_engines(self):
return self.engines
def copytree(self, src, dst):
names = os.listdir(src)
errors = []
for name in names:
if name.startswith('_') or name.endswith('~'):
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if os.path.islink(srcname) and not os.path.exists(dstname):
linkto = os.readlink(srcname)
os.symlink(linkto, dstname)
if os.path.isdir(srcname):
if not os.path.exists(dstname):
os.makedirs(dstname)
shutil.copytree(srcname, dstname, ignore=True)
else:
return self.copytree(srcname, dstname)
else:
copied = False
if os.path.exists(dstname):
src_md5 = hashlib.md5()
f = open(srcname,'rb')
src_md5.update(f.read())
f.close()
dst_md5 = hashlib.md5()
f = open(dstname,'rb')
dst_md5.update(f.read())
f.close()
if src_md5.digest() != dst_md5.digest():
self.report = self.report + 'Update ' + dstname + '<br/>\n'
shutil.copy2(srcname, dstname)
copied = True
else:
self.report = self.report + 'Add ' + dstname + '<br/>\n'
shutil.copy2(srcname, dstname)
copied = True
if copied:
exts = ('css', 'html', 'htm', 'xhtml', 'js')
found = False
for ext in exts:
if srcname.endswith(ext):
found = True
break
if found:
dstname = dstname + '.gz'
if os.path.exists(dstname):
os.unlink(dstname)
f = open(srcname)
content = f.read()
f.close()
f = gzip.open(dstname, 'wb')
f.write(content)
f.close()
# XXX What about devices, sockets etc.?
except (IOError, os.error) as why:
errors.append((srcname, dstname, str(why)))
# catch the Error from the recursive copytree so that we can
# continue with other files
except Exception as err:
errors.extend(err.args[0])
if errors:
raise Exception(errors)
def generate(self, request):
start_time = datetime.now()
self.report = ''
self.load_generators()
self.copytree(self.src_path, self.output_path)
generated = []
hash_posts = {}
hash_posts_content = {}
for engine in self.engines:
if not inspect.ismodule(engine):
continue
for name, obj in inspect.getmembers(engine):
if inspect.isclass(obj) and obj.__module__.startswith("dynastie.generators"):
if obj.__module__ in generated: continue
e = obj(hash_posts, hash_posts_content)
print('Go for {}'.format(e))
try:
r = e.generate(self, self.src_path, self.output_path)
except Exception as err:
self.report += err
continue
generated.append(obj.__module__)
if not r is None:
self.report = self.report + '<br/>\n' + r
duration = datetime.now() - start_time
t = '<br/><br/>Generation of ' + start_time.strftime("%d/%m/%Y at %H:%M:%S") + '<br/>\n'
t = t + 'Duration ' + str(duration) + '<br/><br/>\n'
return t + self.report
def generate_post(self, request, post):
from dynastie.generators import post as PostGenerator
start_time = datetime.now()
self.report = ''
self.load_generators()
self.copytree(self.src_path, self.output_path)
post_list = [post]
hash_posts = {}
hash_posts_content = {}
engine = globals()['post']
for name, obj in inspect.getmembers(engine):
if inspect.isclass(obj) and obj.__module__.startswith("dynastie.generators") \
and obj.__module__.endswith("post"):
e = obj(request, hash_posts, hash_posts_content)
self.report = e._generate(self, self.src_path, self.output_path, post_list)
break
# report = PostGenerator()._generate(self, self.src_path, self.output_path, post_list)
duration = datetime.now() - start_time
t = '<br/><br/>Generation of ' + start_time.strftime("%d/%m/%Y at %H:%M:%S") + ' post ' + str(post.id) + '<br/>\n'
t = t + 'Duration ' + str(duration) + '<br/><br/>\n'
return t + self.report
class Editor(models.Model):
name = models.CharField(max_length=255, unique=True)
class Category(models.Model):
name = models.CharField(max_length=255, unique=True)
name_slug = models.CharField(max_length=255)
parent = models.ForeignKey('self', blank=True, null=True, on_delete=models.CASCADE)
description = models.TextField(max_length=255, blank=True)
blog = models.ForeignKey(Blog, on_delete=models.CASCADE)
def save(self):
self.name_slug = slugify(self.name)
super(Category, self).save()
def remove(self):
blog = Blog.objects.get(pk=self.blog.id)
output = blog.output_path + '/category/' + self.name_slug
if os.path.exists(output):
shutil.rmtree(output)
class Tag(models.Model):
name = models.CharField(max_length=255, unique=True)
name_slug = models.CharField(max_length=255)
blog = models.ForeignKey(Blog, on_delete=models.CASCADE)
def save(self):
self.name_slug = slugify(self.name)
super(Tag, self).save()
def remove(self):
blog = Blog.objects.get(pk=self.blog.id)
output = blog.output_path + '/tag/' + self.name_slug
if os.path.exists(output):
shutil.rmtree(output)
class PostOnlyManager(models.Manager):
def get_queryset(self):
return super(PostOnlyManager, self).get_queryset().filter(post_type='P')
class Post(models.Model):
objects = PostOnlyManager()
title = models.CharField(max_length=255)
title_slug = models.CharField(max_length=255)
category = models.ForeignKey(Category, blank=True, null=True, on_delete=models.SET_NULL)
published = models.BooleanField()
creation_date = models.DateTimeField()
modification_date = models.DateTimeField()
front_page = models.BooleanField()
author = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
description = models.TextField(max_length=255, blank=True)
keywords = models.TextField(blank=True)
tags = models.ManyToManyField(Tag, blank=True, null=True)
blog = models.ForeignKey(Blog, on_delete=models.CASCADE)
language = models.ForeignKey(Language, null=True, on_delete=models.CASCADE)
CONTENT_HTML = 0
CONTENT_TEXT = 1
CONTENT_FORMAT = (
(CONTENT_HTML, 'HTML'),
(CONTENT_TEXT, 'Text'))
content_format = models.IntegerField(choices=CONTENT_FORMAT, default=CONTENT_TEXT, blank=False, null=False)
post_type = models.CharField(max_length=1, default='P')
def getPath(self):
filename = '/post/'
filename = filename + self.creation_date.strftime("%Y") + '/' + self.creation_date.strftime("%m") + '/'
filename = filename + self.title_slug + '.html'
return filename
def save(self):
self.title = self.title.strip()
self.title_slug = slugify(self.title)
self.modification_date=datetime.now()
super(Post, self).save()
def manageTags(self, tags):
tags_list = Tag.objects.filter(blog_id=self.blog.id)
my_tags = []
# Create new tags
for tag in tags.split(','):
if tag == '': continue
tag_slug = slugify(tag)
found = False
for t in tags_list:
if t.name_slug == tag_slug:
found = True
break
if not found and not tag in my_tags:
t = Tag(blog=self.blog, name=tag.strip(), name_slug=tag_slug)
t.save()
# print 'Create ' + tag_slug
my_tags.append(tag)
# Add new tags
post_tags_list = Tag.objects.filter(post=self.id)
for tag in tags.split(','):
if tag == '': continue
tag_slug = slugify(tag)
found = False
for t in post_tags_list:
if t.name_slug == tag_slug:
found = True
break
if not found:
for t in tags_list:
if t.name_slug == tag_slug:
self.tags.add(t)
# print 'Add ' + tag_slug
break
# Remove old tags
if tags == '':
for t in post_tags_list:
self.tags.remove(t)
else:
for t in post_tags_list:
found = False
for tag in tags.split(','):
tag_slug = slugify(tag)
if t.name_slug == tag_slug:
found = True
break
if not found:
# print 'Remove ' + t.name_slug
self.tags.remove(t)
def createPost(self, content, tags):
output = self.blog.src_path
if not os.path.exists(output + '/_post'):
os.mkdir(output + '/_post')
filename = output + '/_post/' + str(self.pk)
content = content.encode('utf-8')
f = open(filename, 'wb')
f.write(content)
f.close()
self.manageTags(tags)
def remove(self):
b = self.blog
output = b.src_path
filename = output + '/_post/' + str(self.pk)
if os.path.exists(filename):
os.unlink(filename)
output = b.output_path + self.getPath()
if os.path.exists(filename):
os.unlink(filename)
filename = filename + '.gz'
if os.path.exists(filename):
os.unlink(filename)
filename = b.src_path + '/post/'
filename = filename + self.creation_date.strftime("%Y") + '/' + self.creation_date.strftime("%m") + '/'
if os.path.exists(filename) and len(os.listdir(filename)) == 0:
os.rmdir(filename)
filename = b.output_path + '/post/'
filename = filename + self.creation_date.strftime("%Y") + '/'
if os.path.exists(filename) and len(os.listdir(filename)) == 0:
os.rmdir(filename)
def get_editor(self):
if self.content_format == Post.CONTENT_HTML:
return 'html'
else:
return 'text'
class Draft(Post):
objects = models.Manager()
def createDraft(self, content, tags):
b = self.blog
output = b.src_path
if not os.path.exists(output + '/_draft'):
os.mkdir(output + '/_draft')
filename = output + '/_draft/' + str(self.pk)
content = content.encode('utf-8')
modif = True
if os.path.exists(filename):
f = open(filename, 'rb')
src_md5 = hashlib.md5()
src_md5.update(f.read())
f.close()
dst_md5 = hashlib.md5()
dst_md5.update(content)
if src_md5.digest() == dst_md5.digest():
modif = False
else:
os.unlink(filename)
if modif:
f = open(filename, 'wb')
f.write(content)
f.close()
self.manageTags(tags)
self.save()
def remove(self):
b = self.blog
output = b.src_path
filename = output + '/_draft/' + str(self.pk)
if os.path.exists(filename):
os.unlink(filename)
def save(self):
self.published = False
super(Draft, self).save()
class Comment(models.Model):
post = models.ForeignKey(Post, on_delete=models.CASCADE)
parent = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE)
date = models.DateTimeField()
author = models.CharField(max_length=255)
email = models.EmailField(max_length=255, blank=True)
the_comment = models.TextField(max_length=255)
ip = models.GenericIPAddressField()
def _update_line_returns(self):
self.the_comment = self.the_comment.replace('\n', '<br />')
def _remove_br(self):
self.the_comment = self.the_comment.replace('<br />', '\n')
class FileOutputCache(models.Model):
name = models.CharField(max_length=512)
hash = models.CharField(max_length=512)
@receiver(post_init, sender=Blog)
def init_blog_signal(sender, **kwargs):
kwargs['instance'].create_paths()
@receiver(post_delete, sender=Blog)
def delete_blog_signal(sender, **kwargs):
kwargs['instance'].remove()
@receiver(pre_delete, sender=Category)
def delete_category_signal(sender, **kwargs):
kwargs['instance'].remove()
@receiver(pre_delete, sender=Tag)
def delete_tag_signal(sender, **kwargs):
kwargs['instance'].remove()
@receiver(post_delete, sender=Post)
def delete_post_signal(sender, **kwargs):
kwargs['instance'].remove()
@receiver(post_delete, sender=Draft)
def delete_draft_signal(sender, **kwargs):
kwargs['instance'].remove()
@receiver(pre_delete, sender=Post)
def pre_delete_post_signal(sender, **kwargs):
post = kwargs['instance']
comments = Comment.objects.filter(post=post.id).delete()
# Replace line returns by <br /> for generation
@receiver(pre_save, sender=Comment)
def pre_save_comment_signal(sender, **kwargs):
kwargs['instance']._update_line_returns()
@receiver(pre_save, sender=Draft)
def pre_save_draft_signal(sender, **kwargs):
kwargs['instance'].post_type = 'D'