# -*- coding: utf-8 -*- """ Copyright 2012-2014 Grégory Soutadé This file is part of Dynastie. Dynastie is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Dynastie is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Dynastie. If not, see . """ import os import shutil import hashlib import inspect import gzip from unicodedata import normalize from re import sub from datetime import datetime from django.db import models from django.contrib.auth.models import User from django.db.models.signals import pre_init, post_init, pre_delete, post_delete from django.db.models.signals import pre_save, post_save from django.dispatch import receiver from dynastie.generators import * def slugify(name): name = name.strip() name = normalize('NFKD', name).replace(' ', '-').lower() #remove `other` characters name = sub('[^a-zA-Z0-9_-]', '', name) #nomalize dashes name = sub('-+', '-', name) return name class Language(models.Model): name = models.CharField(max_length=255, unique=True) abbrev = models.CharField(max_length=4, unique=True) class Blog(models.Model): name = models.CharField(max_length=255, unique=True) title = models.CharField(max_length=255) description = models.TextField(max_length=255, blank=True) keywords = models.TextField(blank=True) writers = models.ManyToManyField(User) engines = list() src_path = '' output_path = '' report = '' def create_paths(self): self.src_path = os.environ['DYNASTIE_ROOT'] + 'sites/' + self.name self.output_path = os.environ['DYNASTIE_ROOT'] + 'sites/' + self.name + '_output' def create(self): self.create_paths() if not os.path.exists(os.environ['DYNASTIE_ROOT'] + 'sites'): os.mkdir(os.environ['DYNASTIE_ROOT'] + 'sites') if not os.path.exists(self.src_path): os.mkdir(self.src_path) if not os.path.exists(self.output_path): os.mkdir(self.output_path) def remove(self): if os.path.exists(self.src_path): shutil.rmtree(self.src_path) if os.path.exists(self.output_path): shutil.rmtree(self.output_path) def load_generators(self): if os.path.exists(self.src_path + '/_generators'): f = open(self.src_path + '/_generators', 'r') for line in f: if line.startswith("#"): continue engine = line.strip() if not engine in globals(): print('Engine ' + engine + ' doesn\'t exists') else: self.engines.append(globals()[engine]) f.close() else: self.engines.append(globals()['post']) self.engines.append(globals()['index']) self.engines.append(globals()['category']) self.engines.append(globals()['tag']) self.engines.append(globals()['archive']) self.engines.append(globals()['atom']) self.engines.append(globals()['rss']) self.engines.append(globals()['all_posts']) def get_engines(self): return self.engines def copytree(self, src, dst): names = os.listdir(src) errors = [] for name in names: if name.startswith('_') or name.endswith('~'): continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if os.path.islink(srcname) and not os.path.exists(dstname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) if os.path.isdir(srcname): if not os.path.exists(dstname): os.makedirs(dstname) shutil.copytree(srcname, dstname, ignore=True) else: return self.copytree(srcname, dstname) else: copied = False if os.path.exists(dstname): src_md5 = hashlib.md5() f = open(srcname,'rb') src_md5.update(f.read()) f.close() dst_md5 = hashlib.md5() f = open(dstname,'rb') dst_md5.update(f.read()) f.close() if src_md5.digest() != dst_md5.digest(): self.report = self.report + 'Update ' + dstname + '
\n' shutil.copy2(srcname, dstname) copied = True else: self.report = self.report + 'Add ' + dstname + '
\n' shutil.copy2(srcname, dstname) copied = True if copied: exts = ('css', 'html', 'htm', 'xhtml', 'js') found = False for ext in exts: if srcname.endswith(ext): found = True break if found: dstname = dstname + '.gz' if os.path.exists(dstname): os.unlink(dstname) f = open(srcname) content = f.read() f.close() f = gzip.open(dstname, 'wb') f.write(content) f.close() # XXX What about devices, sockets etc.? except (IOError, os.error) as why: errors.append((srcname, dstname, str(why))) # catch the Error from the recursive copytree so that we can # continue with other files except Exception as err: errors.extend(err.args[0]) if errors: raise Exception(errors) def generate(self, request): start_time = datetime.now() self.report = '' self.load_generators() self.copytree(self.src_path, self.output_path) generated = [] hash_posts = {} hash_posts_content = {} for engine in self.engines: if not inspect.ismodule(engine): continue for name, obj in inspect.getmembers(engine): if inspect.isclass(obj) and obj.__module__.startswith("dynastie.generators"): if obj.__module__ in generated: continue e = obj(hash_posts, hash_posts_content) print('Go for {}'.format(e)) try: r = e.generate(self, self.src_path, self.output_path) except Exception as err: self.report += err continue generated.append(obj.__module__) if not r is None: self.report = self.report + '
\n' + r duration = datetime.now() - start_time t = '

Generation of ' + start_time.strftime("%d/%m/%Y at %H:%M:%S") + '
\n' t = t + 'Duration ' + str(duration) + '

\n' return t + self.report def generate_post(self, request, post): from dynastie.generators import post as PostGenerator start_time = datetime.now() self.report = '' self.load_generators() self.copytree(self.src_path, self.output_path) post_list = [post] hash_posts = {} hash_posts_content = {} engine = globals()['post'] for name, obj in inspect.getmembers(engine): if inspect.isclass(obj) and obj.__module__.startswith("dynastie.generators") \ and obj.__module__.endswith("post"): e = obj(request, hash_posts, hash_posts_content) self.report = e._generate(self, self.src_path, self.output_path, post_list) break # report = PostGenerator()._generate(self, self.src_path, self.output_path, post_list) duration = datetime.now() - start_time t = '

Generation of ' + start_time.strftime("%d/%m/%Y at %H:%M:%S") + ' post ' + str(post.id) + '
\n' t = t + 'Duration ' + str(duration) + '

\n' return t + self.report class Editor(models.Model): name = models.CharField(max_length=255, unique=True) class Category(models.Model): name = models.CharField(max_length=255, unique=True) name_slug = models.CharField(max_length=255) parent = models.ForeignKey('self', blank=True, null=True, on_delete=models.CASCADE) description = models.TextField(max_length=255, blank=True) blog = models.ForeignKey(Blog, on_delete=models.CASCADE) def save(self): self.name_slug = slugify(self.name) super(Category, self).save() def remove(self): blog = Blog.objects.get(pk=self.blog.id) output = blog.output_path + '/category/' + self.name_slug if os.path.exists(output): shutil.rmtree(output) class Tag(models.Model): name = models.CharField(max_length=255, unique=True) name_slug = models.CharField(max_length=255) blog = models.ForeignKey(Blog, on_delete=models.CASCADE) def save(self): self.name_slug = slugify(self.name) super(Tag, self).save() def remove(self): blog = Blog.objects.get(pk=self.blog.id) output = blog.output_path + '/tag/' + self.name_slug if os.path.exists(output): shutil.rmtree(output) class PostOnlyManager(models.Manager): def get_queryset(self): return super(PostOnlyManager, self).get_queryset().filter(post_type='P') class Post(models.Model): objects = PostOnlyManager() title = models.CharField(max_length=255) title_slug = models.CharField(max_length=255) category = models.ForeignKey(Category, blank=True, null=True, on_delete=models.SET_NULL) published = models.BooleanField() creation_date = models.DateTimeField() modification_date = models.DateTimeField() front_page = models.BooleanField() author = models.ForeignKey(User, null=True, on_delete=models.SET_NULL) description = models.TextField(max_length=255, blank=True) keywords = models.TextField(blank=True) tags = models.ManyToManyField(Tag, blank=True, null=True) blog = models.ForeignKey(Blog, on_delete=models.CASCADE) language = models.ForeignKey(Language, null=True, on_delete=models.CASCADE) CONTENT_HTML = 0 CONTENT_TEXT = 1 CONTENT_FORMAT = ( (CONTENT_HTML, 'HTML'), (CONTENT_TEXT, 'Text')) content_format = models.IntegerField(choices=CONTENT_FORMAT, default=CONTENT_TEXT, blank=False, null=False) post_type = models.CharField(max_length=1, default='P') def getPath(self): filename = '/post/' filename = filename + self.creation_date.strftime("%Y") + '/' + self.creation_date.strftime("%m") + '/' filename = filename + self.title_slug + '.html' return filename def save(self): self.title = self.title.strip() self.title_slug = slugify(self.title) self.modification_date=datetime.now() super(Post, self).save() def manageTags(self, tags): tags_list = Tag.objects.filter(blog_id=self.blog.id) my_tags = [] # Create new tags for tag in tags.split(','): if tag == '': continue tag_slug = slugify(tag) found = False for t in tags_list: if t.name_slug == tag_slug: found = True break if not found and not tag in my_tags: t = Tag(blog=self.blog, name=tag.strip(), name_slug=tag_slug) t.save() # print 'Create ' + tag_slug my_tags.append(tag) # Add new tags post_tags_list = Tag.objects.filter(post=self.id) for tag in tags.split(','): if tag == '': continue tag_slug = slugify(tag) found = False for t in post_tags_list: if t.name_slug == tag_slug: found = True break if not found: for t in tags_list: if t.name_slug == tag_slug: self.tags.add(t) # print 'Add ' + tag_slug break # Remove old tags if tags == '': for t in post_tags_list: self.tags.remove(t) else: for t in post_tags_list: found = False for tag in tags.split(','): tag_slug = slugify(tag) if t.name_slug == tag_slug: found = True break if not found: # print 'Remove ' + t.name_slug self.tags.remove(t) def createPost(self, content, tags): output = self.blog.src_path if not os.path.exists(output + '/_post'): os.mkdir(output + '/_post') filename = output + '/_post/' + str(self.pk) content = content.encode('utf-8') f = open(filename, 'wb') f.write(content) f.close() self.manageTags(tags) def remove(self): b = self.blog output = b.src_path filename = output + '/_post/' + str(self.pk) if os.path.exists(filename): os.unlink(filename) output = b.output_path + self.getPath() if os.path.exists(filename): os.unlink(filename) filename = filename + '.gz' if os.path.exists(filename): os.unlink(filename) filename = b.src_path + '/post/' filename = filename + self.creation_date.strftime("%Y") + '/' + self.creation_date.strftime("%m") + '/' if os.path.exists(filename) and len(os.listdir(filename)) == 0: os.rmdir(filename) filename = b.output_path + '/post/' filename = filename + self.creation_date.strftime("%Y") + '/' if os.path.exists(filename) and len(os.listdir(filename)) == 0: os.rmdir(filename) def get_editor(self): if self.content_format == Post.CONTENT_HTML: return 'html' else: return 'text' class Draft(Post): objects = models.Manager() def createDraft(self, content, tags): b = self.blog output = b.src_path if not os.path.exists(output + '/_draft'): os.mkdir(output + '/_draft') filename = output + '/_draft/' + str(self.pk) content = content.encode('utf-8') modif = True if os.path.exists(filename): f = open(filename, 'rb') src_md5 = hashlib.md5() src_md5.update(f.read()) f.close() dst_md5 = hashlib.md5() dst_md5.update(content) if src_md5.digest() == dst_md5.digest(): modif = False else: os.unlink(filename) if modif: f = open(filename, 'wb') f.write(content) f.close() self.manageTags(tags) self.save() def remove(self): b = self.blog output = b.src_path filename = output + '/_draft/' + str(self.pk) if os.path.exists(filename): os.unlink(filename) def save(self): self.published = False super(Draft, self).save() class Comment(models.Model): post = models.ForeignKey(Post, on_delete=models.CASCADE) parent = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE) date = models.DateTimeField() author = models.CharField(max_length=255) email = models.EmailField(max_length=255, blank=True) the_comment = models.TextField(max_length=255) ip = models.GenericIPAddressField() def _update_line_returns(self): self.the_comment = self.the_comment.replace('\n', '
') def _remove_br(self): self.the_comment = self.the_comment.replace('
', '\n') class FileOutputCache(models.Model): name = models.CharField(max_length=512) hash = models.CharField(max_length=512) @receiver(post_init, sender=Blog) def init_blog_signal(sender, **kwargs): kwargs['instance'].create_paths() @receiver(post_delete, sender=Blog) def delete_blog_signal(sender, **kwargs): kwargs['instance'].remove() @receiver(pre_delete, sender=Category) def delete_category_signal(sender, **kwargs): kwargs['instance'].remove() @receiver(pre_delete, sender=Tag) def delete_tag_signal(sender, **kwargs): kwargs['instance'].remove() @receiver(post_delete, sender=Post) def delete_post_signal(sender, **kwargs): kwargs['instance'].remove() @receiver(post_delete, sender=Draft) def delete_draft_signal(sender, **kwargs): kwargs['instance'].remove() @receiver(pre_delete, sender=Post) def pre_delete_post_signal(sender, **kwargs): post = kwargs['instance'] comments = Comment.objects.filter(post=post.id).delete() # Replace line returns by
for generation @receiver(pre_save, sender=Comment) def pre_save_comment_signal(sender, **kwargs): kwargs['instance']._update_line_returns() @receiver(pre_save, sender=Draft) def pre_save_draft_signal(sender, **kwargs): kwargs['instance'].post_type = 'D'