#!/usr/bin/env ruby # encoding: utf-8 # ODBA::Storage -- odba -- 08.12.2011 -- mhatakeyama@ywesee.com # ODBA::Storage -- odba -- 29.04.2004 -- hwyss@ywesee.com rwaltert@ywesee.com mwalder@ywesee.com require 'thread' require 'singleton' require 'dbi' module ODBA class Storage # :nodoc: all include Singleton attr_writer :dbi BULK_FETCH_STEP = 2500 TABLES = [ # in table 'object', the isolated dumps of all objects are stored ['object', <<-'SQL'], CREATE TABLE object ( odba_id INTEGER NOT NULL, content TEXT, name TEXT, prefetchable BOOLEAN, extent TEXT, PRIMARY KEY(odba_id), UNIQUE(name) ); SQL ['prefetchable_index', <<-SQL], CREATE INDEX prefetchable_index ON object(prefetchable); SQL ['extent_index', <<-SQL], CREATE INDEX extent_index ON object(extent); SQL # helper table 'object_connection' ['object_connection', <<-'SQL'], CREATE TABLE object_connection ( origin_id integer, target_id integer, PRIMARY KEY(origin_id, target_id) ); SQL ['target_id_index', <<-SQL], CREATE INDEX target_id_index ON object_connection(target_id); SQL # helper table 'collection' ['collection', <<-'SQL'], CREATE TABLE collection ( odba_id integer NOT NULL, key text, value text, PRIMARY KEY(odba_id, key) ); SQL ] def initialize @id_mutex = Mutex.new end def bulk_restore(bulk_fetch_ids) if(bulk_fetch_ids.empty?) [] else bulk_fetch_ids = bulk_fetch_ids.uniq rows = [] while(!(ids = bulk_fetch_ids.slice!(0, BULK_FETCH_STEP)).empty?) sql = <<-SQL SELECT odba_id, content FROM object WHERE odba_id IN (#{ids.join(',')}) SQL rows.concat(self.dbi.select_all(sql)) end rows end end def collection_fetch(odba_id, key_dump) sql = <<-SQL SELECT value FROM collection WHERE odba_id = ? AND key = ? SQL row = self.dbi.select_one(sql, odba_id, key_dump) row.first unless row.nil? end def collection_remove(odba_id, key_dump) self.dbi.do <<-SQL, odba_id, key_dump DELETE FROM collection WHERE odba_id = ? AND key = ? SQL end def collection_store(odba_id, key_dump, value_dump) self.dbi.do <<-SQL, odba_id, key_dump, value_dump INSERT INTO collection (odba_id, key, value) VALUES (?, ?, ?) SQL end def condition_index_delete(index_name, origin_id, search_terms, target_id=nil) values = [] sql = "DELETE FROM #{index_name}" if(origin_id) sql << " WHERE origin_id = ?" else sql << " WHERE origin_id IS ?" end search_terms.each { |key, value| sql << " AND %s = ?" % key values << value } if(target_id) sql << " AND target_id = ?" values << target_id end self.dbi.do sql, origin_id, *values end def condition_index_ids(index_name, id, id_name) sql = <<-SQL SELECT DISTINCT * FROM #{index_name} WHERE #{id_name}=? SQL self.dbi.select_all(sql, id) end def create_dictionary_map(language) self.dbi.do <<-SQL ALTER TEXT SEARCH CONFIGURATION default_#{language} ALTER MAPPING FOR asciiword, asciihword, hword_asciipart, word, hword, hword_part, hword_numpart, numword, numhword WITH #{language}_ispell, #{language}_stem; SQL self.dbi.do <<-SQL ALTER TEXT SEARCH CONFIGURATION default_#{language} ALTER MAPPING FOR host, file, int, uint, version WITH simple; SQL # drop from default setting self.dbi.do <<-SQL ALTER TEXT SEARCH CONFIGURATION default_#{language} DROP MAPPING FOR email, url, url_path, sfloat, float SQL end def create_condition_index(table_name, definition) self.dbi.do <<-SQL CREATE TABLE #{table_name} ( origin_id INTEGER, #{definition.collect { |*pair| pair.join(' ') }.join(",\n ") }, target_id INTEGER ); SQL #index origin_id self.dbi.do <<-SQL CREATE INDEX origin_id_#{table_name} ON #{table_name}(origin_id); SQL #index search_term definition.each { |name, datatype| self.dbi.do <<-SQL CREATE INDEX #{name}_#{table_name} ON #{table_name}(#{name}); SQL } #index target_id self.dbi.do <<-SQL CREATE INDEX target_id_#{table_name} ON #{table_name}(target_id); SQL end def create_fulltext_index(table_name) self.dbi.do <<-SQL CREATE TABLE #{table_name} ( origin_id INTEGER, search_term tsvector, target_id INTEGER ); SQL #index origin_id self.dbi.do <<-SQL CREATE INDEX origin_id_#{table_name} ON #{table_name}(origin_id); SQL #index search_term self.dbi.do <<-SQL CREATE INDEX search_term_#{table_name} ON #{table_name} USING gist(search_term); SQL #index target_id self.dbi.do <<-SQL CREATE INDEX target_id_#{table_name} ON #{table_name}(target_id); SQL end def create_index(table_name) self.dbi.do <<-SQL CREATE TABLE #{table_name} ( origin_id INTEGER, search_term TEXT, target_id INTEGER ); SQL #index origin_id self.dbi.do <<-SQL CREATE INDEX origin_id_#{table_name} ON #{table_name}(origin_id) SQL #index search_term self.dbi.do <<-SQL CREATE INDEX search_term_#{table_name} ON #{table_name}(search_term) SQL #index target_id self.dbi.do <<-SQL CREATE INDEX target_id_#{table_name} ON #{table_name}(target_id) SQL end def dbi Thread.current[:txn] || @dbi end def drop_index(index_name) self.dbi.do "DROP TABLE #{index_name}" end def delete_index_element(index_name, odba_id, id_name) self.dbi.do <<-SQL, odba_id DELETE FROM #{index_name} WHERE #{id_name} = ? SQL end def delete_persistable(odba_id) # delete origin from connections self.dbi.do <<-SQL, odba_id DELETE FROM object_connection WHERE origin_id = ? SQL # delete target from connections self.dbi.do <<-SQL, odba_id DELETE FROM object_connection WHERE target_id = ? SQL # delete from collections self.dbi.do <<-SQL, odba_id DELETE FROM collection WHERE odba_id = ? SQL # delete from objects self.dbi.do <<-SQL, odba_id DELETE FROM object WHERE odba_id = ? SQL end def ensure_object_connections(origin_id, target_ids) sql = <<-SQL SELECT target_id FROM object_connection WHERE origin_id = ? SQL target_ids.uniq! update_ids = target_ids old_ids = [] ## use self.dbi instead of @dbi to get information about ## object_connections previously stored within this transaction if(rows = self.dbi.select_all(sql, origin_id)) old_ids = rows.collect { |row| row[0] } old_ids.uniq! delete_ids = old_ids - target_ids update_ids = target_ids - old_ids unless(delete_ids.empty?) while(!(ids = delete_ids.slice!(0, BULK_FETCH_STEP)).empty?) self.dbi.do <<-SQL, origin_id DELETE FROM object_connection WHERE origin_id = ? AND target_id IN (#{ids.join(',')}) SQL end end end sth = self.dbi.prepare <<-SQL INSERT INTO object_connection (origin_id, target_id) VALUES (?, ?) SQL update_ids.each { |id| sth.execute(origin_id, id) } rescue DBI::ProgrammingError => e $stdout.puts "ensure_object_connections: rescue DBI::ProgrammingError #{origin_id} target_ids #{target_ids} sql #{sql}\nEnf of SQL" $stdout.puts e.inspect sth.finish ensure $stdout.puts "ensure_object_connections: ensure #{origin_id} target_ids #{target_ids} sql #{sql}" sth.finish end def ensure_target_id_index(table_name) #index target_id self.dbi.do <<-SQL CREATE INDEX target_id_#{table_name} ON #{table_name}(target_id) SQL rescue end def extent_count(klass) self.dbi.select_one(<<-EOQ, klass.to_s).first SELECT COUNT(odba_id) FROM object WHERE extent = ? EOQ end def extent_ids(klass) self.dbi.select_all(<<-EOQ, klass.to_s).flatten SELECT odba_id FROM object WHERE extent = ? EOQ end def fulltext_index_delete(index_name, id, id_name) self.dbi.do <<-SQL, id DELETE FROM #{index_name} WHERE #{id_name} = ? SQL end def fulltext_index_target_ids(index_name, origin_id) sql = <<-SQL SELECT DISTINCT target_id FROM #{index_name} WHERE origin_id=? SQL self.dbi.select_all(sql, origin_id) end def generate_dictionary(language, data_path='/usr/share/postgresql/tsearch_data/', file='fulltext') found = true %w{dict affix stop}.each do |ext| filename = "#{language}_#{file}.#{ext}" source = File.join(data_path, filename) unless File.exists?(source) puts "ERROR: \"#{filename}\" does not exist in #{data_path}." found = false end end return unless found # setup configuration self.dbi.do <<-SQL CREATE TEXT SEARCH CONFIGURATION public.default_#{language} ( COPY = pg_catalog.#{language} ); SQL # ispell self.dbi.do <<-SQL CREATE TEXT SEARCH DICTIONARY #{language}_ispell ( TEMPLATE = ispell, DictFile = #{language}_#{file}, AffFile = #{language}_#{file}, StopWords = #{language}_#{file} ); SQL # stem is already there. create_dictionary_map(language) end def index_delete_origin(index_name, odba_id, term) self.dbi.do <<-SQL, odba_id, term DELETE FROM #{index_name} WHERE origin_id = ? AND search_term = ? SQL end def index_delete_target(index_name, origin_id, search_term, target_id) self.dbi.do <<-SQL, origin_id, search_term, target_id DELETE FROM #{index_name} WHERE origin_id = ? AND search_term = ? AND target_id = ? SQL end def index_fetch_keys(index_name, length=nil) expr = if(length) "substr(search_term, 1, #{length})" else "search_term" end sql = <<-SQL SELECT DISTINCT #{expr} AS key FROM #{index_name} ORDER BY key SQL self.dbi.select_all(sql).flatten end def index_matches(index_name, substring, limit=nil, offset=0) sql = <<-SQL SELECT DISTINCT search_term AS key FROM #{index_name} WHERE search_term LIKE ? ORDER BY key SQL if limit sql << "LIMIT #{limit}\n" end if offset > 0 sql << "OFFSET #{offset}\n" end self.dbi.select_all(sql, substring + '%').flatten end def index_origin_ids(index_name, target_id) sql = <<-SQL SELECT DISTINCT origin_id, search_term FROM #{index_name} WHERE target_id=? SQL self.dbi.select_all(sql, target_id) end def index_target_ids(index_name, origin_id) sql = <<-SQL SELECT DISTINCT target_id, search_term FROM #{index_name} WHERE origin_id=? SQL self.dbi.select_all(sql, origin_id) end def max_id @id_mutex.synchronize do ensure_next_id_set @next_id end end def next_id @id_mutex.synchronize do ensure_next_id_set @next_id += 1 end end def update_max_id(id) @id_mutex.synchronize do @next_id = id end end def reserve_next_id(reserved_id) @id_mutex.synchronize do ensure_next_id_set if @next_id < reserved_id @next_id = reserved_id else raise OdbaDuplicateIdError, "The id '#{reserved_id}' has already been assigned" end end end def remove_dictionary(language) # remove configuration self.dbi.do <<-SQL DROP TEXT SEARCH CONFIGURATION IF EXISTS default_#{language} SQL # remove ispell dictionaries self.dbi.do <<-SQL DROP TEXT SEARCH DICTIONARY IF EXISTS #{language}_ispell; SQL end def restore(odba_id) row = self.dbi.select_one("SELECT content FROM object WHERE odba_id = ?", odba_id) row.first unless row.nil? end def retrieve_connected_objects(target_id) sql = <<-SQL SELECT origin_id FROM object_connection WHERE target_id = ? SQL self.dbi.select_all(sql, target_id) end def retrieve_from_condition_index(index_name, conditions, limit=nil) sql = <<-EOQ SELECT target_id, COUNT(target_id) AS relevance FROM #{index_name} WHERE TRUE EOQ values = [] lines = conditions.collect { |name, info| val = nil condition = nil if(info.is_a?(Hash)) condition = info['condition'] if(val = info['value']) if(/i?like/i.match(condition)) val += '%' end condition = "#{condition || '='} ?" values.push(val.to_s) end elsif(info) condition = "= ?" values.push(info.to_s) end sql << <<-EOQ AND #{name} #{condition || 'IS NULL'} EOQ } sql << " GROUP BY target_id\n" if(limit) sql << " LIMIT #{limit}" end self.dbi.select_all(sql, *values) end def retrieve_from_fulltext_index(index_name, search_term, dict, limit=nil) ## this combination of gsub statements solves the problem of # properly escaping strings of this form: "(2:1)" into # '\(2\:1\)' (see test_retrieve_from_fulltext_index) term = search_term.strip.gsub(/\s+/, '&').gsub(/&+/, '&')\ .gsub(/[():]/i, '\\ \\&').gsub(/\s/, '') sql = <<-EOQ SELECT target_id, max(ts_rank(search_term, to_tsquery(?, ?))) AS relevance FROM #{index_name} WHERE search_term @@ to_tsquery(?, ?) GROUP BY target_id ORDER BY relevance DESC EOQ if(limit) sql << " LIMIT #{limit}" end self.dbi.select_all(sql, dict, term, dict, term) rescue DBI::ProgrammingError => e warn("ODBA::Storage.retrieve_from_fulltext_index rescued a DBI::ProgrammingError(#{e.message}). Query:") warn("self.dbi.select_all(#{sql}, #{dict}, #{term}, #{dict}, #{term})") warn("returning empty result") [] end def retrieve_from_index(index_name, search_term, exact=nil, limit=nil) unless(exact) search_term = search_term + "%" end sql = <<-EOQ SELECT target_id, COUNT(target_id) AS relevance FROM #{index_name} WHERE search_term LIKE ? GROUP BY target_id EOQ if(limit) sql << " LIMIT #{limit}" end self.dbi.select_all(sql, search_term) end def restore_collection(odba_id) self.dbi.select_all <<-EOQ SELECT key, value FROM collection WHERE odba_id = #{odba_id} EOQ end def restore_named(name) row = self.dbi.select_one("SELECT content FROM object WHERE name = ?", name) row.first unless row.nil? end def restore_prefetchable self.dbi.select_all <<-EOQ SELECT odba_id, content FROM object WHERE prefetchable = true EOQ end def setup TABLES.each { |name, definition| self.dbi.do(definition) rescue DBI::ProgrammingError } unless(self.dbi.columns('object').any? { |col| col.name == 'extent' }) self.dbi.do <<-EOS ALTER TABLE object ADD COLUMN extent TEXT; CREATE INDEX extent_index ON object(extent); EOS end end def store(odba_id, dump, name, prefetchable, klass) sql = "SELECT name FROM object WHERE odba_id = ?" if(row = self.dbi.select_one(sql, odba_id)) name ||= row['name'] self.dbi.do <<-SQL, dump, name, prefetchable, klass.to_s, odba_id UPDATE object SET content = ?, name = ?, prefetchable = ?, extent = ? WHERE odba_id = ? SQL else self.dbi.do <<-SQL, odba_id, dump, name, prefetchable, klass.to_s INSERT INTO object (odba_id, content, name, prefetchable, extent) VALUES (?, ?, ?, ?, ?) SQL end end def transaction(&block) dbi = nil retval = nil @dbi.transaction { |dbi| ## this should not be necessary anymore: #dbi['AutoCommit'] = false Thread.current[:txn] = dbi retval = block.call } retval ensure ## this should not be necessary anymore: #dbi['AutoCommit'] = true Thread.current[:txn] = nil end def update_condition_index(index_name, origin_id, search_terms, target_id) keys = [] vals = [] search_terms.each { |key, val| keys.push(key) vals.push(val) } if(target_id) self.dbi.do <<-SQL, origin_id, target_id, *vals INSERT INTO #{index_name} (origin_id, target_id, #{keys.join(', ')}) VALUES (?, ?#{', ?' * keys.size}) SQL else key_str = keys.collect { |key| "#{key}=?" }.join(', ') self.dbi.do <<-SQL, *(vals.push(origin_id)) UPDATE #{index_name} SET #{key_str} WHERE origin_id = ? SQL end end def update_fulltext_index(index_name, origin_id, search_term, target_id, dict) search_term = search_term.gsub(/\s+/, ' ').strip if(target_id) self.dbi.do <<-SQL, origin_id, dict, search_term, target_id INSERT INTO #{index_name} (origin_id, search_term, target_id) VALUES (?, to_tsvector(?, ?), ?) SQL else self.dbi.do <<-SQL, dict, search_term, origin_id UPDATE #{index_name} SET search_term=to_tsvector(?, ?) WHERE origin_id=? SQL end end def update_index(index_name, origin_id, search_term, target_id) if(target_id) self.dbi.do <<-SQL, origin_id, search_term, target_id INSERT INTO #{index_name} (origin_id, search_term, target_id) VALUES (?, ?, ?) SQL else self.dbi.do <<-SQL, search_term, origin_id UPDATE #{index_name} SET search_term=? WHERE origin_id=? SQL end end private def ensure_next_id_set if(@next_id.nil?) @next_id = restore_max_id end end def restore_max_id row = self.dbi.select_one("SELECT odba_id FROM object ORDER BY odba_id DESC LIMIT 1") unless(row.nil? || row.first.nil?) row.first else 0 end end end end