From f1f263af5911f996c6b8958a5f5c6a02a88b7525 Mon Sep 17 00:00:00 2001 From: Max Lapshin Date: Fri, 5 Dec 2008 16:07:09 +0300 Subject: [PATCH] Added new db writer --- mysql2psql | 154 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 151 insertions(+), 3 deletions(-) diff --git a/mysql2psql b/mysql2psql index 0b4efb4..1047a11 100755 --- a/mysql2psql +++ b/mysql2psql @@ -45,7 +45,7 @@ class MysqlReader :primary_key => field.is_pri_key? } if field.is_pri_key? - @mysql.query("SELECT max(`#{field.name}`) FROM `#{name}`") do |res| + @mysql.query("SELECT max(`#{field.name}`) + 1 FROM `#{name}`") do |res| desc[:maxval] = res.fetch_row[0].to_i end end @@ -272,6 +272,149 @@ EOF end end +class PostgresDbWriter < Writer + def initialize(hostname, login, password, database) + require 'postgres' + @conn = PGconn.open('host' => hostname, 'user' => login, 'password' => password, 'dbname' => database) + @conn.exec("SET client_encoding = 'UTF8'") + @conn.exec("SET standard_conforming_strings = off") + @conn.exec("SET check_function_bodies = false") + @conn.exec("SET client_min_messages = warning") + end + + def write_table(table) + primary_keys = [] + primary_key = nil + maxval = nil + + columns = table.columns.map do |column| + if column[:primary_key] + if column[:name] == "id" + primary_key = column[:name] + maxval = column[:maxval] < 1 ? 1 : column[:maxval] + 1 + end + primary_keys << column[:name] + end + " " + column_description(column) + end.join(",\n") + + if primary_key + @conn.exec("DROP SEQUENCE IF EXISTS #{table.name}_#{primary_key}_seq CASCADE") + @conn.exec <<-EOF + CREATE SEQUENCE #{table.name}_#{primary_key}_seq + INCREMENT BY 1 + NO MAXVALUE + NO MINVALUE + CACHE 1 + EOF + + @conn.exec "SELECT pg_catalog.setval('#{table.name}_#{primary_key}_seq', #{maxval}, true)" + end + + @conn.exec "DROP TABLE IF EXISTS #{PGconn.quote_ident(table.name)} CASCADE;" + table_sql = "CREATE TABLE #{PGconn.quote_ident(table.name)} (\n" + columns + + if primary_index = table.indexes.find {|index| index[:primary]} + table_sql << ",\n CONSTRAINT #{table.name}_pkey PRIMARY KEY(#{primary_index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")})" + end + + table_sql << "\n)\nWITH (OIDS=FALSE);" + @conn.exec(table_sql) + puts "Created table #{table.name}" + + table.indexes.each do |index| + next if index[:primary] + unique = index[:unique] ? "UNIQUE " : nil + @conn.exec("DROP INDEX IF EXISTS #{PGconn.quote_ident(index[:name])} CASCADE;") + @conn.exec("CREATE #{unique}INDEX #{PGconn.quote_ident(index[:name])} ON #{PGconn.quote_ident(table.name)} (#{index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")});") + end + + end + + def column_description(column) + "#{PGconn.quote_ident(column[:name])} #{column_type(column)}" + end + + def column_type(column) + case + when column[:primary_key] && column[:name] == "id" + "integer DEFAULT nextval('#{column[:table_name]}_#{column[:name]}_seq'::regclass) NOT NULL" + else + default = column[:default] ? " DEFAULT #{column[:default] == nil ? 'NULL' : "'"+column[:default]+"'"}" : nil + null = column[:null] ? "" : " NOT NULL" + type = + case column[:type] + when "var_string" + default = default + "::character varying" if default + "character varying(#{column[:length]})" + when "long" + default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default + "bigint" + when "longlong" + default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default + "bigint" + when "datetime" + default = nil + "timestamp without time zone" + when "date" + default = nil + "date" + when "char" + if default + default = " DEFAULT #{column[:default].to_i == 1 ? 'true' : 'false'}" + end + "boolean" + when "blob" + "text" + when "float" + default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default + "numeric(#{column[:length] + column[:decimals]}, #{column[:decimals]})" + when "decimal" + default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default + "numeric(#{column[:length] + column[:decimals]}, #{column[:decimals]})" + else + puts column.inspect + column[:type].inspect + end + "#{type}#{default}#{null}" + end + end + + def write_contents(table, reader) + _time1 = Time.now + @conn.exec("COPY #{table.name} (#{table.columns.map {|column| PGconn.quote_ident(column[:name])}.join(", ")}) FROM stdin;") + print "Loading #{table.name}: " + STDOUT.flush + counter = 0 + reader.paginated_read(table, 1000) do |row| + line = [] + table.columns.each_with_index do |column, index| + row[index] = row[index].to_s if row[index].is_a?(Mysql::Time) + if column[:type] == "char" + row[index] = row[index] == 1 ? 't' : row[index] == 0 ? 'f' : row[index] + end + row[index] = row[index].gsub(/\\/, '\\\\\\').gsub(/\n/,'\n').gsub(/\t/,'\t').gsub(/\r/,'\r') if row[index].is_a?(String) + row[index] = '\N' if !row[index] + row[index] + end + @conn.putline row.join("\t") + "\n" + counter += 1 + if counter % 1000 == 0 + print "." + STDOUT.flush + end + end + _time2 = Time.now + puts " #{counter} (#{((_time2 - _time1) / 60).round}min #{((_time2 - _time1) % 60).round}s)" + @conn.putline(".\n") + end + + def close + @conn.close + end +end + + class Converter attr_reader :reader, :writer @@ -281,19 +424,24 @@ class Converter end def convert + _time1 = Time.now reader.tables.each do |table| writer.write_table(table) end + _time2 = Time.now reader.tables.each do |table| writer.write_contents(table, reader) end writer.close + _time3 = Time.now + puts "Table creation #{((_time2 - _time1) / 60).round} min, total #{((_time3 - _time1) / 60).round} min" end end -reader = MysqlReader.new('localhost', 'root', nil, $ARGV[1]) -writer = PostgresFileWriter.new($ARGV[2] || "output.sql") +reader = MysqlReader.new('localhost', 'root', nil, 'lookatme_development') +#writer = PostgresFileWriter.new($ARGV[2] || "output.sql") +writer = PostgresDbWriter.new('localhost', 'lookatme', '123', 'lookatme_development') converter = Converter.new(reader, writer) converter.convert -- 2.1.4