Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use Retrievers for fetching remote / local data #104

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Restructure retriever files
julik committed Apr 19, 2024
commit e61bb94b9f134d59178152c57609a719da59e7d5
4 changes: 3 additions & 1 deletion lib/zipline.rb
Original file line number Diff line number Diff line change
@@ -11,7 +11,9 @@
module Zipline
require_relative "zipline/version"
require_relative "zipline/zip_handler"
require_relative "zipline/retrievers"
Dir.glob(__dir__ + "/zipline/retrievers/*.rb").sort.each do |retriever_module_path|
require retriever_module_path
end

def self.included(into_controller)
into_controller.include(ZipKit::RailsStreaming)
124 changes: 0 additions & 124 deletions lib/zipline/retrievers.rb
Original file line number Diff line number Diff line change
@@ -1,125 +1 @@
class Zipline::IORetriever
def self.build_for(item)
return new(item) if item.respond_to?(:read) && item.respond_to?(:read_nonblock)
end

def initialize(io)
@io = io
end

def download_and_write_into(destination)
IO.copy_stream(@io, destination)
end
end

class Zipline::FileRetriever < Zipline::IORetriever
def self.build_for(item)
return super(item) if item.is_a?(File)
end

def download_and_write_into(destination)
@io.rewind
super(destination)
ensure
@io.close
end
end

class Zipline::HTTPRetriever
def self.build_for(url_or_uri)
uri = begin
URI.parse(url_or_uri)
rescue
return
end
return new(uri) if uri.is_a?(URI::HTTP) || uri.is_a?(URI::HTTPS)
end

def initialize(uri)
@uri = uri
end

def download_and_write_into(destination)
Net::HTTP.get_response(@uri) do |response|
response.read_body do |chunk|
destination.write(chunk)
end
end
end
end

class Zipline::StringRetriever
def self.build_for(item)
return unless item.is_a?(String)
new(item)
end

def initialize(string)
@string = string
end

def download_and_write_into(destination)
chunk_size = 1024
offset = 0
loop do
bytes = @string.byteslice(offset, chunk_size)
offset += chunk_size
destination.write(bytes)
break if bytes.nil?
end
end

def may_restart_after?(e)
false
end
end

class Zipline::CarrierwaveRetriever
def self.build_for(item)
if defined?(CarrierWave::Storage::Fog::File) && item.is_a?(CarrierWave::Storage::Fog::File)
return Zipline::HTTPRetriever.new(item.url)
end
end
end

class Zipline::ActiveStorageRetriever
def self.build_for(item)
return unless defined?(ActiveStorage)
return new(item.blob) if is_active_storage_attachment?(item) || is_active_storage_one?(item)
return new(item) if is_active_storage_blob?(item)
nil
end

def self.is_active_storage_attachment?(item)
defined?(ActiveStorage::Attachment) && item.is_a?(ActiveStorage::Attachment)
end

def self.is_active_storage_one?(item)
defined?(ActiveStorage::Attached::One) && item.is_a?(ActiveStorage::Attached::One)
end

def self.is_active_storage_blob?(item)
defined?(ActiveStorage::Blob) && item.is_a?(ActiveStorage::Blob)
end

def initialize(blob)
@blob = blob
end

def download_and_write_into(destination)
@blob.download do |bytes|
destination.write(bytes)
end
end
end

class Zipline::PaperclipRetriever
def self.build_for(item)
return unless defined?(Paperclip) && item.is_a?(Paperclip::Attachment)
if item.options[:storage] == :filesystem
Zipline::FileRetriever.build_for(File.open(item.path, "rb"))
else
Zipline::HTTPRetriever.build_for(file.expiring_url)
end
end
end
34 changes: 34 additions & 0 deletions lib/zipline/retrievers/active_storage_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
class Zipline::ActiveStorageRetriever
def self.build_for(item)
return unless defined?(ActiveStorage)
return new(item.blob) if is_active_storage_attachment?(item) || is_active_storage_one?(item)
return new(item) if is_active_storage_blob?(item)
nil
end

def self.is_active_storage_attachment?(item)
item.is_a?(ActiveStorage::Attachment)
end

def self.is_active_storage_one?(item)
item.is_a?(ActiveStorage::Attached::One)
end

def self.is_active_storage_blob?(item)
item.is_a?(ActiveStorage::Blob)
end

def initialize(blob)
@blob = blob
end

def retrieve_into(destination)
@blob.download do |bytes|
destination.write(bytes)
end
end

def restartable?(_exception)
false
end
end
7 changes: 7 additions & 0 deletions lib/zipline/retrievers/carrierwave_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class Zipline::CarrierwaveRetriever
def self.build_for(item)
if defined?(CarrierWave::Storage::Fog::File) && item.is_a?(CarrierWave::Storage::Fog::File)
Zipline::HTTPRetriever.new(item.url)
end
end
end
21 changes: 21 additions & 0 deletions lib/zipline/retrievers/file_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class Zipline::FileRetriever
def self.build_for(item)
return new(item) if item.is_a?(File)
end

def initialize(file)
@file = file
end

def retrieve_into(destination)
@file.rewind
@file.binmode
IO.copy_stream(@file, destination)
ensure
@file.close
end

def restartable?(_exception)
false
end
end
34 changes: 34 additions & 0 deletions lib/zipline/retrievers/http_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
class Zipline::HTTPRetriever
def self.build_for(url_or_uri)
return unless url_or_uri.is_a?(URI) || url_or_uri.is_a?(String) && url_or_uri.start_with?("http")

uri = begin
URI.parse(url_or_uri)
rescue
return
end

return new(uri) if uri.is_a?(URI::HTTP) || uri.is_a?(URI::HTTPS)
end

def initialize(uri)
@uri = uri
end

def retrieve_into(destination)
Net::HTTP.get_response(@uri) do |response|
response.read_body do |chunk|
destination.write(chunk)
end
end
end

def restartable?(exception)
restartables = [
Net::HTTPServerError,
Net::HTTPClientException,
Net::HTTPServiceUnavailable
]
restartables.any? { |net_http_server_error_class| net_http_server_error_class === exception }
end
end
17 changes: 17 additions & 0 deletions lib/zipline/retrievers/io_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class Zipline::IORetriever
def self.build_for(item)
return new(item) if item.respond_to?(:read) && item.respond_to?(:read_nonblock)
end

def initialize(io)
@io = io
end

def retrieve_into(destination)
IO.copy_stream(@io, destination)
end

def restartable?(_exception)
false
end
end
10 changes: 10 additions & 0 deletions lib/zipline/retrievers/paperclip_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class Zipline::PaperclipRetriever
def self.build_for(item)
return unless defined?(Paperclip) && item.is_a?(Paperclip::Attachment)
if item.options[:storage] == :filesystem
Zipline::FileRetriever.build_for(File.open(item.path, "rb"))
else
Zipline::HTTPRetriever.build_for(file.expiring_url)
end
end
end
18 changes: 18 additions & 0 deletions lib/zipline/retrievers/shrine_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class Zipline::ShrineRetriever
def self.build_for(item)
return unless defined?(Shrine::UploadedFile) && item.is_a?(Shrine::UploadedFile)
new(item)
end

def initialize(shrine_uploaded_file)
@shrine_uploaded_file = shrine_uploaded_file
end

def retrieve_into(destination)
@shrine_uploaded_file.stream(destination)
end

def restartable?(_exception)
false
end
end
25 changes: 25 additions & 0 deletions lib/zipline/retrievers/string_retriever.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class Zipline::StringRetriever
def self.build_for(item)
return unless item.is_a?(String)
new(item)
end

def initialize(string)
@string = string
end

def retrieve_into(destination)
chunk_size = 1024
offset = 0
loop do
bytes = @string.byteslice(offset, chunk_size)
offset += chunk_size
destination.write(bytes)
break if bytes.nil?
end
end

def restartable?(_exception)
false
end
end
24 changes: 21 additions & 3 deletions lib/zipline/zip_handler.rb
Original file line number Diff line number Diff line change
@@ -17,8 +17,25 @@ def handle_file(file, name, options)

def write_item(item, name, options)
retriever = pick_retriever_for(item)
@streamer.write_file(name, **options.slice(:modification_time)) do |writable|
retriever.download_and_write_into(writable)
attempts = 0
begin
attempts += 1
@streamer.write_file(name, **options.slice(:modification_time)) do |writable|
ActiveSupport::Notifications.instrument("zipline.retrieve_and_write", {retriever_class: retriever.class.to_s, filename: name}) do
retriever.retrieve_into(writable)
end
end
rescue => exception
# If an exception is raised and it is known to be caused by the data retrieval from
# remote, we can retry outputting this particular file. ZipKit will rollback the file
# before raising the exception to us, so we can just redo the `write_file` call.
if retriever.restartable?(exception)
@logger.warn { "Reattempting of #{name.inspect} will be reattempted after #{exception}" }
retry
else
@logger.warn { "Retrieval of #{name.inspect} cannot be restarted after #{exception}, abprting" }
raise
end
end
end

@@ -27,10 +44,11 @@ def pick_retriever_for(item)
Zipline::CarrierwaveRetriever,
Zipline::ActiveStorageRetriever,
Zipline::PaperclipRetriever,
Zipline::ShrineRetriever,
Zipline::FileRetriever,
Zipline::IORetriever,
Zipline::HTTPRetriever,
Zipline::StringRetriever,
Zipline::StringRetriever
]
retriever_classes.each do |retriever_class|
maybe_retriever = retriever_class.build_for(item)