-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathupsert.rb
More file actions
245 lines (217 loc) · 7.34 KB
/
upsert.rb
File metadata and controls
245 lines (217 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
require 'bigdecimal'
require 'thread'
require 'logger'
require 'upsert/version'
require 'upsert/binary'
require 'upsert/connection'
require 'upsert/merge_function'
require 'upsert/column_definition'
require 'upsert/row'
class Upsert
class << self
# What logger to use.
# @return [#info,#warn,#debug]
attr_writer :logger
# The current logger
# @return [#info,#warn,#debug]
def logger
@logger || Thread.exclusive do
@logger ||= if defined?(::Rails) and (rails_logger = ::Rails.logger)
rails_logger
elsif defined?(::ActiveRecord) and ::ActiveRecord.const_defined?(:Base) and (ar_logger = ::ActiveRecord::Base.logger)
ar_logger
else
my_logger = Logger.new $stderr
case ENV['UPSERT_DEBUG']
when 'true'
my_logger.level = Logger::DEBUG
when 'false'
my_logger.level = Logger::INFO
end
my_logger
end
end
end
# @param [Mysql2::Client,Sqlite3::Database,PG::Connection,#metal] connection A supported database connection.
#
# Clear any database functions that may have been created.
#
# Currently only applies to PostgreSQL.
def clear_database_functions(connection)
dummy = new(connection, :dummy)
dummy.clear_database_functions
end
# @param [String] v A string containing binary data that should be inserted/escaped as such.
#
# @return [Upsert::Binary]
def binary(v)
Binary.new v
end
# More efficient way of upserting multiple rows at once.
#
# @param [Mysql2::Client,Sqlite3::Database,PG::Connection,#metal] connection A supported database connection.
# @param [String,Symbol] table_name The name of the table into which you will be upserting.
#
# @yield [Upsert] An +Upsert+ object in batch mode. You can call #row on it multiple times and it will try to optimize on speed.
#
# @return [nil]
#
# @example Many at once
# Upsert.batch(Pet.connection, Pet.table_name) do |upsert|
# upsert.row({:name => 'Jerry'}, :breed => 'beagle')
# upsert.row({:name => 'Pierre'}, :breed => 'tabby')
# end
def batch(connection, table_name, options = {})
upsert = new connection, table_name, options
yield upsert
end
# @deprecated Use .batch instead.
alias :stream :batch
# @private
def class_name(metal)
if RUBY_PLATFORM == 'java'
metal.class.name || metal.get_class.name
else
metal.class.name
end
end
# @private
def flavor(metal)
case class_name(metal)
when /sqlite/i
'Sqlite3'
when /mysql/i
'Mysql'
when /pg/i, /postgres/i
'Postgresql'
else
raise "[upsert] #{metal} not supported"
end
end
# @private
def adapter(metal)
metal_class_name = class_name metal
METAL_CLASS_ALIAS.fetch(metal_class_name, metal_class_name).gsub /\W+/, '_'
end
# @private
def metal(connection)
metal = connection.respond_to?(:raw_connection) ? connection.raw_connection : connection
if metal.class.name.to_s.start_with?('ActiveRecord::ConnectionAdapters')
metal = metal.connection
end
metal
end
# @private
def utc(time)
if time.is_a? DateTime
usec = time.sec_fraction * SEC_FRACTION
if time.offset != 0
time = time.new_offset(0)
end
Time.utc time.year, time.month, time.day, time.hour, time.min, time.sec, usec
elsif time.utc?
time
else
time.utc
end
end
# @private
def utc_iso8601(time, tz = true)
t = utc time
s = t.strftime(ISO8601_DATETIME) + '.' + (USEC_SPRINTF % t.usec)
tz ? (s + UTC_TZ) : s
end
end
SINGLE_QUOTE = %{'}
DOUBLE_QUOTE = %{"}
BACKTICK = %{`}
X_AND_SINGLE_QUOTE = %{x'}
USEC_SPRINTF = '%06d'
if RUBY_VERSION >= '1.9.0'
SEC_FRACTION = 1e6
NANO_FRACTION = 1e9
else
SEC_FRACTION = 8.64e10
NANO_FRACTION = 8.64e13
end
ISO8601_DATETIME = '%Y-%m-%d %H:%M:%S'
ISO8601_DATE = '%F'
UTC_TZ = '+00:00'
NULL_WORD = 'NULL'
METAL_CLASS_ALIAS = {
'PGConn' => 'PG::Connection',
'org.sqlite.Conn' => 'Java::OrgSqlite::Conn', # for some reason, org.sqlite.Conn doesn't have a ruby class name
'Sequel::Postgres::Adapter' => 'PG::Connection', # Only the Postgres adapter needs an alias
}
CREATED_COL_REGEX = /\Acreated_(at|on)\z/
# @return [Upsert::Connection]
attr_reader :connection
# @return [String]
attr_reader :table_name
# @private
attr_reader :merge_function_class
# @private
attr_reader :flavor
# @private
attr_reader :adapter
attr_accessor :increment_keys
# @private
def assume_function_exists?
@assume_function_exists
end
# @param [Mysql2::Client,Sqlite3::Database,PG::Connection,#metal] connection A supported database connection.
# @param [String,Symbol] table_name The name of the table into which you will be upserting.
# @param [Hash] options
# @option options [TrueClass,FalseClass] :assume_function_exists (false) Assume the function has already been defined correctly by another process.
def initialize(connection, table_name, options = {})
@table_name = table_name.to_s
metal = Upsert.metal connection
@flavor = Upsert.flavor metal
@adapter = Upsert.adapter metal
# todo memoize
Dir[File.expand_path("../upsert/**/{#{flavor.downcase},#{adapter}}.rb", __FILE__)].each do |path|
require path
end
@connection = Connection.const_get(adapter).new self, metal
@merge_function_class = MergeFunction.const_get adapter
@merge_function_cache = {}
@assume_function_exists = options.fetch :assume_function_exists, false
end
# Upsert a row given a selector and a setter.
#
# The selector values are used as setters if it's a new row. So if your selector is `name=Jerry` and your setter is `age=4`, and there is no Jerry yet, then a new row will be created with name Jerry and age 4.
#
# @see http://api.mongodb.org/ruby/1.6.4/Mongo/Collection.html#update-instance_method Loosely based on the upsert functionality of the mongo-ruby-driver #update method
#
# @param [Hash] selector Key-value pairs that will be used to find or create a row.
# @param [Hash] setter Key-value pairs that will be set on the row, whether it previously existed or not.
#
# @return [nil]
#
# @example One at a time
# upsert = Upsert.new Pet.connection, Pet.table_name
# upsert.row({:name => 'Jerry'}, :breed => 'beagle')
# upsert.row({:name => 'Pierre'}, :breed => 'tabby')
def row(selector, setter = {}, options = {})
@increment_keys = options.fetch(:increment, []).map(&:to_s)
row_object = Row.new(selector, setter, options)
merge_function(row_object).execute(row_object)
nil
end
# @private
def clear_database_functions
merge_function_class.clear connection
end
def merge_function(row)
cache_key = [row.selector.keys, row.setter.keys]
@merge_function_cache[cache_key] ||= merge_function_class.new(self, row.selector.keys, row.setter.keys, assume_function_exists?)
end
# @private
def quoted_table_name
@quoted_table_name ||= connection.quote_ident table_name
end
# @private
def column_definitions
@column_definitions ||= ColumnDefinition.const_get(flavor).all connection, table_name, @increment_keys
end
end