Index: examples/nntp/dirstore ================================================================== --- examples/nntp/dirstore +++ examples/nntp/dirstore @@ -13,30 +13,35 @@ # and id handling. # Signature (syntax) of the storage command: # # (1) last => Returns last id processed. -# (2) save => Take message through stdin, and save, mark as last. - - +# (2) names => Returns list of all known ids +# (3) save => Take message through stdin, and save, mark as last. +# (4) get => Return message through stdout. +# (5) has => Return boolean indicating existence of message +# +# Here, = dirstore proc main {} { if {![cmdline]} usage $::method } proc cmdline {} { - global argv directory method saveid + global argv directory method theid theid if {[llength $argv] < 2} {return 0} # Retrieve arguments lassign $argv directory method - if {$method eq "save"} { + if {$method ni {last save get has names}} {return 0} + + if {$method in {save get has}} { if {[llength $argv] != 3} {return 0} - set saveid [lindex $argv 2] + set theid [lindex $argv 2] } else { if {[llength $argv] != 2} {return 0} } validatedir store $directory @@ -50,11 +55,11 @@ if {![file writable $path]} { stop "$which not writable: $path" } } proc usage {} { global argv0 - puts stderr "$argv0: wrong # args, should be \"$argv0 last|(save )\"" + puts stderr "$argv0: wrong # args, should be \"$argv0 last|names|(save )|(get )\"" exit 1 } proc stop {text} { global argv0 @@ -70,20 +75,47 @@ set id [string trim [fileutil::cat $directory/last]] } puts $id return } + +proc names {} { + global directory + set ids {} + if {[file exists $directory]} { + foreach id [lsort -dict [glob -nocomplain -directory $directory -tails q*]] { + lappend ids [string range $id 1 end] + } + } + puts [join $ids \n] + return +} proc save {} { - global directory saveid + global directory theid set dst [open $directory/current w] fcopy stdin $dst close $dst - file rename -force $directory/current $directory/q$saveid + file rename -force $directory/current $directory/q$theid + + fileutil::writeFile $directory/last $theid + return +} + +proc get {} { + global directory theid + + set src [open $directory/q$theid r] + fcopy $src stdout + close $src + return +} - fileutil::writeFile $directory/last $saveid +proc has {} { + global directory theid + puts -nonewline stdout [file exists $directory/q$theid] return } main exit ADDED examples/nntp/movenews Index: examples/nntp/movenews ================================================================== --- /dev/null +++ examples/nntp/movenews @@ -0,0 +1,110 @@ +#!/usr/bin/env tclsh +## -*- tcl -*- + +package require Tcl 8.5 + +# This application talks to two stores and transfers the messages from +# the source to the destination. + +proc main {} { + if {![cmdline]} usage + movemessages +} + +proc cmdline {} { + global argv src dst keepid conflict + + set keepid no + set conflict error + + while {[string match -* [set o [lindex $argv 0]]]} { + switch -exact $o { + -K - --keepid { set keepid yes } + -I - --ignore { set conflict ignore } + default { return 0 } + } + set argv [lrange $argv 1 end] + } + + if {[llength $argv] != 2} {return 0} + + # Retrieve arguments + set argv [lassign $argv src dst] + + if {![llength $src]} { return 0 } + if {![llength $dst]} { return 0 } + + return 1 +} + +proc usage {} { + global argv0 + puts stderr "$argv0: wrong # args, should be \"$argv0 ?--keepid|-K? ?--plain|-P? src-cmd dst-cmd\"" + exit 1 +} + +proc stop {text} { + global argv0 + puts stderr "$argv0: $text" + exit 1 +} + +proc log {text} { + puts -nonewline $text + flush stdout + return +} + +proc movemessages {} { + global src dst keepid conflict + + set ids [store_cmd $src {} names] + + set lasthandled [store_cmd $dst {} last] + if {$lasthandled eq {}} { + set lasthandled -1 + } + + foreach id $ids { + incr lasthandled + if {$keepid} { + if {($id < $lasthandled) || [store_cmd $dst {} has $id]} { + switch -exact $conflict { + error { + log "conflict $id\n" + exit 1 + } + ignore { + log "ignoring $id\n" + incr lasthandled -1 + continue + } + } + } + set lasthandled $id + } + + log "reading $id ... " + set msg [store_cmd $src {} get $id] + + log "saving to $lasthandled ..." + set r [store_cmd $dst $msg save $lasthandled] + + if {$r ne {}} { log " $r" } + log \n + } + return +} + +proc store_cmd {storecommand si args} { + #puts "run: [list {*}$storecommand {*}$args]" + + if {$si ne {}} { + return [exec << $si {*}$storecommand {*}$args] + } else { + return [exec {*}$storecommand {*}$args] + } +} + +main +exit Index: examples/nntp/pullnews ================================================================== --- examples/nntp/pullnews +++ examples/nntp/pullnews @@ -13,12 +13,10 @@ # Signature (syntax) of the storage command: # # (1) last => Returns last id processed. # (2) save => Take message through stdin, and save, mark as last. - - proc main {} { if {![cmdline]} usage pullmessages } ADDED examples/nntp/sqlitestore Index: examples/nntp/sqlitestore ================================================================== --- /dev/null +++ examples/nntp/sqlitestore @@ -0,0 +1,388 @@ +#!/usr/bin/env tclsh +## -*- tcl -*- + +package require Tcl 8.5 +package require fileutil +package require sqlite3 +package require dbutil +package require try + +# Tcl 8.5 (de)compression support +package require zlibtcl +package require Trf + +proc zipit {msg} { zip -mode compress -level 9 $msg } +proc unzipit {msg} { zip -mode decompress $msg } + +# This application stores received nntp messages into a named directory. +# That name is specified on the command line. +# The article is read from stdin. +# +# The application supports the API expected by 'pullnews' for saving +# and id handling. + +# Signature (syntax) of the storage command: +# +# (1) last => Returns last id processed. +# (2) names => Returns list of all known ids +# (3) save => Take message through stdin, and save, mark as last. +# (4) get => Return message through stdout. +# (5) has => Return boolean indicating existence of message +# +# Here, = sqlitestore +# +# The peer-code is used to identify the origin of the messages in the +# database. It is chosen by the user. This means that sqlitestore is +# able to handle multiple incoming streams of messages and yet keep +# them separate. Messages are actually identified by their +# 'Message-Id:' header. + +proc main {} { + if {![cmdline]} usage + $::method +} + +proc cmdline {} { + global argv database peer method handle compress conflict + + set compress yes + set conflict error + + while {[string match -* [set o [lindex $argv 0]]]} { + switch -exact $o { + -P - --plain { set compress no } + -I - --ignore { set conflict ignore } + default { return 0 } + } + set argv [lrange $argv 1 end] + } + + if {[llength $argv] < 3} {return 0} + + # Retrieve arguments + set argv [lassign $argv database peer method] + + if {$method ni {last save get has names}} {return 0} + + if {$method in {save get has}} { + if {[llength $argv] != 1} {return 0} + set handle [lindex $argv 0] + } else { + if {[llength $argv] != 0} {return 0} + } + + validate store $database + return 1 +} + +proc validate {which path} { + if {![file exists $path]} { stop "$which does not exist: $path" } + if {![file isfile $path]} { stop "$which not a file: $path" } + if {![file readable $path]} { stop "$which not readable: $path" } + if {![file writable $path]} { stop "$which not writable: $path" } + + sqlite3 MSTORE $path + + # peer <-1:n- message_peer -m:1-> message + # - message_peer is the materialized n:m relation between peers and messages. + # - It further holds the id information we are getting from the peer, i.e. + # the peer-specific identification of a message. + + if {[dbutil initialize-schema MSTORE error \ + message { + { + -- Message data. Identified by the message-id value + -- from the message headers. The blob may be stored + -- zlib compressed. + + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + msgid TEXT NOT NULL UNIQUE, + msg BLOB NOT NULL + } { + {id INTEGER 1 {} 1} + {msgid TEXT 1 {} 0} + {msg BLOB 1 {} 0} + } {} + } peer { + { + -- Peers. Represent hosts from which message are + -- coming into the system. Just names. + + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE + } { + {id INTEGER 1 {} 1} + {name TEXT 1 {} 0} + } {} + } message_peer { + { + -- Linkage between peers and messages. Each peer + -- identifies messages by a unique numeric handle + -- specific to the peer. Across multiple peers the + -- same message may (and likely will) have different + -- handles. + + pid INTEGER NOT NULL REFERENCES peer, + handle INTEGER NOT NULL, + mid INTEGER NOT NULL REFERENCES message, + PRIMARY KEY (pid,handle), + UNIQUE (pid,mid) + } { + {pid INTEGER 1 {} 1} + {handle INTEGER 1 {} 2} + {mid INTEGER 1 {} 0} + } {} + } header { + { + -- Header keys. Just names. + + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE + } { + {id INTEGER 1 {} 1} + {name TEXT 1 {} 0} + } {} + } message_header { + { + -- Linkage between headers and messages, plus the + -- value of the header in the message. + + hid INTEGER NOT NULL REFERENCES header, + mid INTEGER NOT NULL REFERENCES message, + value TEXT NOT NULL, + PRIMARY KEY (hid,mid) + } { + {hid INTEGER 1 {} 1} + {mid INTEGER 1 {} 2} + {value TEXT 1 {} 0} + } {{hid value}} + }]} return + stop "$which: $error" + return +} + +proc usage {} { + global argv0 + puts stderr "$argv0: wrong # args, should be \"$argv0 last|names|(save )|(get )\"" + exit 1 +} + +proc stop {text} { + global argv0 + puts stderr "$argv0: $text" + exit 1 +} + +proc last {} { + set pid [getpid] + + set id [MSTORE onecolumn { + SELECT MAX(handle) + FROM message_peer + WHERE pid = :pid + }] + + puts $id + return +} + +proc names {} { + set pid [getpid] + + puts [join [MSTORE eval { + SELECT handle + FROM message_peer + WHERE pid = :pid + ORDER BY handle + }] \n] + return +} + +proc save {} { + global peer handle conflict + + set msg [read stdin] + set mid [getmid $msg] + set pid [getpid] + + # Link message to peer, under the given id. + + try { + MSTORE eval { + INSERT INTO message_peer + VALUES (:pid, :handle, :mid) + } + } on error {e o} { + if {$conflict eq "error"} { + # Rethrow + return {*}$o $e + } else { + puts -nonewline ignored + } + } + + return +} + +proc get {} { + global handle + + set pid [getpid] + set id [MSTORE eval { + SELECT mid + FROM message_peer + WHERE pid = :pid + AND handle = :handle + }] + + set msg [getblob $id] + + puts -nonewline stdout $msg + return +} + +proc has {} { + global handle + + set pid [getpid] + set id [MSTORE onecolumn { + SELECT handle + FROM message_peer + WHERE pid = :pid + AND handle = :handle + }] + puts -nonewline stdout [expr {$id ne {}}] + return +} + +proc getblob {id} { + set src [MSTORE incrblob -readonly message msg $id] + fconfigure $src -translation binary -encoding binary + set msg [read $src] + close $src + + # Try to decompress. Failure simply means that data was stored + # plain. + catch { + set msg [unzipit $msg] + } + return $msg +} + +proc getmid {msg} { + global compress + + lassign [process $msg] header body + + set msgid [dict get $header message-id] + set date [dict get $header date] + + set date [clock scan $date] + + dict unset header message-id + dict set header date $date + + MSTORE transaction { + set id [MSTORE onecolumn { + SELECT id + FROM message + WHERE msgid = :msgid + }] + if {$id eq {}} { + # Save unknown message. + + if {$compress} { + set msg [zipit $msg] + } + MSTORE eval { + INSERT INTO message + VALUES (NULL, :msgid, @msg) + } + set id [MSTORE last_insert_rowid] + + # Save and link all headers for searches + foreach k [lsort -dict [dict keys $header]] { + linkheader [gethid $k] $id [dict get $header $k] + } + } + } + return $id +} + +proc getpid {} { + global peer + + MSTORE transaction { + set id [MSTORE onecolumn { + SELECT id + FROM peer + WHERE name = :peer + }] + if {$id eq {}} { + # Save unknown peer. + MSTORE eval { + INSERT INTO peer + VALUES (NULL, :peer) + } + set id [MSTORE last_insert_rowid] + } + } + return $id +} + +proc gethid {key} { + MSTORE transaction { + set id [MSTORE onecolumn { + SELECT id + FROM header + WHERE name = :key + }] + if {$id eq {}} { + # Save unknown header + MSTORE eval { + INSERT INTO header + VALUES (NULL, :key) + } + set id [MSTORE last_insert_rowid] + } + } + return $id +} + +proc linkheader {key msg value} { + MSTORE eval { + INSERT INTO message_header + VALUES (:key, :msg, :value) + } + return +} + +proc process {msg} { + set head {} + set body {} + set inBody 0 + set lastheader {} + + foreach line [split $msg "\n"] { + if {$inBody} { + lappend body $line + } elseif {[string length $line] == 0} { + set inBody 1 + } elseif {[regexp {^([^ :]+): +(.*)} $line => header value]} { + set header [string tolower $header] + set value [string trim $value] + if {[string length $value]} { + dict set head $header "$value " + } + set lastheader $header + } else { + dict append head $lastheader "[string trim $line] " + } + } + + return [list $head $body] +} + + +main +exit