unified tilerenderer job

This commit is contained in:
NatureFreshMilk 2019-01-24 07:52:25 +01:00
parent 0a92589f71
commit 641cea560e
6 changed files with 194 additions and 113 deletions

View File

@ -6,22 +6,20 @@ import (
"mapserver/coords"
"mapserver/layer"
"os"
"sync"
"runtime"
"sync"
)
type Config struct {
Port int `json:"port"`
EnableInitialRendering bool `json:"enableinitialrendering"`
EnableIncrementalUpdate bool `json:"enableincrementalupdate"`
Webdev bool `json:"webdev"`
WebApi *WebApiConfig `json:"webapi"`
RenderState *RenderStateType `json:"renderstate"`
Layers []layer.Layer `json:"layers"`
InitialRenderingFetchLimit int `json:"initialrenderingfetchlimit"`
InitialRenderingJobs int `json:"initialrenderingjobs"`
InitialRenderingQueue int `json:"initialrenderingqueue"`
UpdateRenderingFetchLimit int `json:"updaterenderingfetchlimit"`
Port int `json:"port"`
EnableRendering bool `json:"enablerendering"`
Webdev bool `json:"webdev"`
WebApi *WebApiConfig `json:"webapi"`
RenderState *RenderStateType `json:"renderstate"`
Layers []layer.Layer `json:"layers"`
RenderingFetchLimit int `json:"renderingfetchlimit"`
RenderingJobs int `json:"renderingjobs"`
RenderingQueue int `json:"renderingqueue"`
}
type WebApiConfig struct {
@ -81,7 +79,7 @@ func ParseConfig(filename string) (*Config, error) {
LastX: coords.MinCoord - 1,
LastY: coords.MinCoord - 1,
LastZ: coords.MinCoord - 1,
LastMtime: 1,
LastMtime: 0,
}
layers := []layer.Layer{
@ -94,17 +92,15 @@ func ParseConfig(filename string) (*Config, error) {
}
cfg := Config{
Port: 8080,
EnableInitialRendering: true,
EnableIncrementalUpdate: true,
Webdev: false,
WebApi: &webapi,
RenderState: &rstate,
Layers: layers,
InitialRenderingFetchLimit: 1000,
InitialRenderingJobs: runtime.NumCPU(),
InitialRenderingQueue: 100,
UpdateRenderingFetchLimit: 1000,
Port: 8080,
EnableRendering: true,
Webdev: false,
WebApi: &webapi,
RenderState: &rstate,
Layers: layers,
RenderingFetchLimit: 1000,
RenderingJobs: runtime.NumCPU(),
RenderingQueue: 100,
}
info, err := os.Stat(filename)

View File

@ -75,9 +75,9 @@ func convertRows(pos int64, data []byte, mtime int64) Block {
const getLastBlockQuery = `
select pos,data,mtime
from blocks b
where b.mtime > ?
where b.mtime >= ?
and b.pos > ?
order by b.pos asc, b.mtime asc
order by b.mtime asc, b.pos asc
limit ?
`
@ -135,7 +135,6 @@ func (db *Sqlite3Accessor) CountBlocks(frommtime, tomtime int64) (int, error) {
return 0, nil
}
const getBlockQuery = `
select pos,data,mtime from blocks b where b.pos = ?
`

View File

@ -4,10 +4,9 @@ import (
"fmt"
"github.com/sirupsen/logrus"
"mapserver/app"
"mapserver/initialrenderer"
"mapserver/mapobject"
"mapserver/params"
"mapserver/tileupdate"
"mapserver/tilerendererjob"
"mapserver/web"
"runtime"
)
@ -62,13 +61,8 @@ func main() {
mapobject.Setup(ctx)
//run initial rendering
if ctx.Config.EnableInitialRendering && ctx.Config.RenderState.InitialRun {
go initialrenderer.Job(ctx)
}
//Incremental update
if ctx.Config.EnableIncrementalUpdate {
go tileupdate.Job(ctx)
if ctx.Config.EnableRendering {
go tilerendererjob.Job(ctx)
}
//Start http server

View File

@ -44,12 +44,22 @@ func (a *MapBlockAccessor) Update(pos coords.MapBlockCoords, mb *mapblockparser.
type FindMapBlocksResult struct {
HasMore bool
LastPos *coords.MapBlockCoords
LastMtime int64
List []*mapblockparser.MapBlock
UnfilteredCount int
}
func (a *MapBlockAccessor) FindMapBlocks(lastpos coords.MapBlockCoords, lastmtime int64, limit int, layerfilter []layer.Layer) (*FindMapBlocksResult, error) {
fields := logrus.Fields{
"x": lastpos.X,
"y": lastpos.Y,
"z": lastpos.Z,
"lastmtime": lastmtime,
"limit": limit,
}
logrus.WithFields(fields).Debug("FindMapBlocks")
blocks, err := a.accessor.FindBlocks(lastpos, lastmtime, limit)
if err != nil {
@ -65,6 +75,9 @@ func (a *MapBlockAccessor) FindMapBlocks(lastpos coords.MapBlockCoords, lastmtim
for _, block := range blocks {
newlastpos = &block.Pos
if result.LastMtime < block.Mtime {
result.LastMtime = block.Mtime
}
inLayer := false
for _, l := range layerfilter {
@ -107,7 +120,6 @@ func (a *MapBlockAccessor) FindMapBlocks(lastpos coords.MapBlockCoords, lastmtim
return &result, nil
}
func (a *MapBlockAccessor) GetMapBlock(pos coords.MapBlockCoords) (*mapblockparser.MapBlock, error) {
key := getKey(pos)

View File

@ -0,0 +1,156 @@
package tilerendererjob
import (
"github.com/sirupsen/logrus"
"mapserver/app"
"mapserver/coords"
"strconv"
"time"
)
func getTileKey(tc *coords.TileCoords) string {
return strconv.Itoa(tc.X) + "/" + strconv.Itoa(tc.Y) + "/" + strconv.Itoa(tc.Zoom)
}
func worker(ctx *app.App, coords <-chan *coords.TileCoords) {
for tc := range coords {
ctx.Objectdb.RemoveTile(tc)
_, err := ctx.Tilerenderer.Render(tc, 2)
if err != nil {
panic(err)
}
}
}
func Job(ctx *app.App) {
rstate := ctx.Config.RenderState
var totalLegacyCount int
var err error
if rstate.InitialRun {
totalLegacyCount, err = ctx.Blockdb.CountBlocks(0, 0)
if err != nil {
panic(err)
}
fields := logrus.Fields{
"totalLegacyCount": totalLegacyCount,
"LastMtime": rstate.LastMtime,
}
logrus.WithFields(fields).Info("Starting rendering job")
} else {
fields := logrus.Fields{
"LastMtime": rstate.LastMtime,
}
logrus.WithFields(fields).Info("Starting rendering job")
}
tilecount := 0
lastcoords := coords.NewMapBlockCoords(rstate.LastX, rstate.LastY, rstate.LastZ)
jobs := make(chan *coords.TileCoords, ctx.Config.RenderingQueue)
for i := 0; i < ctx.Config.RenderingJobs; i++ {
go worker(ctx, jobs)
}
for true {
start := time.Now()
result, err := ctx.BlockAccessor.FindMapBlocks(lastcoords, rstate.LastMtime, ctx.Config.RenderingFetchLimit, ctx.Config.Layers)
if err != nil {
panic(err)
}
if len(result.List) == 0 && !result.HasMore {
if rstate.InitialRun {
rstate.InitialRun = false
ctx.Config.Save()
fields := logrus.Fields{
"legacyblocks": rstate.LegacyProcessed,
}
logrus.WithFields(fields).Info("initial rendering complete")
}
time.Sleep(5 * time.Second)
continue
}
tileRenderedMap := make(map[string]bool)
for i := 12; i >= 1; i-- {
for _, mb := range result.List {
//13
tc := coords.GetTileCoordsFromMapBlock(mb.Pos, ctx.Config.Layers)
//12-1
tc = tc.ZoomOut(13 - i)
key := getTileKey(tc)
if tileRenderedMap[key] {
continue
}
tileRenderedMap[key] = true
fields := logrus.Fields{
"X": tc.X,
"Y": tc.Y,
"Zoom": tc.Zoom,
"LayerId": tc.LayerId,
}
logrus.WithFields(fields).Debug("Dispatching tile rendering (z11-1)")
tilecount++
jobs <- tc
}
}
lastcoords = *result.LastPos
rstate.LastMtime = result.LastMtime
if rstate.InitialRun {
//Save current positions of initial run
rstate.LastX = lastcoords.X
rstate.LastY = lastcoords.Y
rstate.LastZ = lastcoords.Z
rstate.LegacyProcessed += result.UnfilteredCount
}
ctx.Config.Save()
t := time.Now()
elapsed := t.Sub(start)
if rstate.InitialRun {
progress := int(float64(rstate.LegacyProcessed) / float64(totalLegacyCount) * 100)
fields := logrus.Fields{
"count": len(result.List),
"processed": rstate.LegacyProcessed,
"progress%": progress,
"X": lastcoords.X,
"Y": lastcoords.Y,
"Z": lastcoords.Z,
"elapsed": elapsed,
}
logrus.WithFields(fields).Info("Initial rendering")
} else {
fields := logrus.Fields{
"count": len(result.List),
"elapsed": elapsed,
}
logrus.WithFields(fields).Info("incremental rendering")
}
}
}

View File

@ -1,76 +0,0 @@
package tileupdate
import (
"mapserver/app"
"mapserver/coords"
"time"
"github.com/sirupsen/logrus"
)
func Job(ctx *app.App) {
rstate := ctx.Config.RenderState
fields := logrus.Fields{
"lastmtime": rstate.LastMtime,
}
logrus.WithFields(fields).Info("Starting incremental update")
for true {
mblist, err := ctx.BlockAccessor.FindLatestMapBlocks(rstate.LastMtime, ctx.Config.UpdateRenderingFetchLimit, ctx.Config.Layers)
if err != nil {
panic(err)
}
for _, mb := range mblist {
if mb.Mtime > rstate.LastMtime {
rstate.LastMtime = mb.Mtime
}
tc := coords.GetTileCoordsFromMapBlock(mb.Pos, ctx.Config.Layers)
if tc == nil {
panic("tile not in any layer")
}
for tc.Zoom > 1 {
tc = tc.GetZoomedOutTile()
ctx.Objectdb.RemoveTile(tc)
}
}
//Render zoom 12-1
for _, mb := range mblist {
tc := coords.GetTileCoordsFromMapBlock(mb.Pos, ctx.Config.Layers)
for tc.Zoom > 1 {
tc = tc.GetZoomedOutTile()
fields = logrus.Fields{
"X": tc.X,
"Y": tc.Y,
"Zoom": tc.Zoom,
"LayerId": tc.LayerId,
}
logrus.WithFields(fields).Debug("Dispatching tile rendering (update)")
_, err = ctx.Tilerenderer.Render(tc, 2)
if err != nil {
panic(err)
}
}
}
ctx.Config.Save()
if len(mblist) > 0 {
fields = logrus.Fields{
"count": len(mblist),
"lastmtime": rstate.LastMtime,
}
logrus.WithFields(fields).Info("incremental update")
}
time.Sleep(5 * time.Second)
}
}