2021-01-28 09:10:30 +01:00
//
// A P I S e r v i c e + P e r s i s t + T i m e l i n e . s w i f t
// M a s t o d o n
//
// C r e a t e d b y s x i a o j i a n o n 2 0 2 1 / 1 / 2 7 .
//
import os . log
2021-02-04 07:45:44 +01:00
import func QuartzCore . CACurrentMediaTime
2021-01-28 09:10:30 +01:00
import Foundation
import Combine
import CoreData
import CoreDataStack
import MastodonSDK
extension APIService . Persist {
2021-02-04 07:45:44 +01:00
2021-01-28 09:10:30 +01:00
enum PersistTimelineType {
2021-02-04 07:45:44 +01:00
case ` public `
case home
2021-01-28 09:10:30 +01:00
}
2021-02-04 07:45:44 +01:00
2021-01-28 09:10:30 +01:00
static func persistTimeline (
managedObjectContext : NSManagedObjectContext ,
2021-02-04 07:45:44 +01:00
domain : String ,
query : Mastodon . API . Timeline . TimelineQuery ,
2021-02-05 10:53:00 +01:00
response : Mastodon . Response . Content < [ Mastodon . Entity . Status ] > ,
2021-02-04 07:45:44 +01:00
persistType : PersistTimelineType ,
requestMastodonUserID : MastodonUser . ID ? , // c o u l d b e n i l w h e n r e s p o n s e f r o m p u b l i c e n d p o i n t
log : OSLog
2021-01-28 09:10:30 +01:00
) -> AnyPublisher < Result < Void , Error > , Never > {
2021-02-04 07:45:44 +01:00
let toots = response . value
os_log ( . info , log : log , " %{public}s[%{public}ld], %{public}s: persist %{public}ld toots… " , ( ( #file as NSString ) . lastPathComponent ) , #line , #function , toots . count )
2021-01-28 09:10:30 +01:00
return managedObjectContext . performChanges {
2021-02-04 07:45:44 +01:00
let contextTaskSignpostID = OSSignpostID ( log : log )
let start = CACurrentMediaTime ( )
os_signpost ( . begin , log : log , name : #function , signpostID : contextTaskSignpostID )
defer {
os_signpost ( . end , log : . api , name : #function , signpostID : contextTaskSignpostID )
let end = CACurrentMediaTime ( )
os_log ( . info , log : . debug , " %{public}s[%{public}ld], %{public}s: persist cost %.2fs " , ( ( #file as NSString ) . lastPathComponent ) , #line , #function , end - start )
}
// l o a d r e q u e s t m a s t o d o n u s e r
let requestMastodonUser : MastodonUser ? = {
guard let requestMastodonUserID = requestMastodonUserID else { return nil }
let request = MastodonUser . sortedFetchRequest
request . predicate = MastodonUser . predicate ( domain : domain , id : requestMastodonUserID )
request . fetchLimit = 1
request . returnsObjectsAsFaults = false
do {
return try managedObjectContext . fetch ( request ) . first
} catch {
assertionFailure ( error . localizedDescription )
return nil
}
} ( )
// l o a d w o r k i n g s e t i n t o c o n t e x t t o a v o i d c a c h e m i s s
let cacheTaskSignpostID = OSSignpostID ( log : log )
os_signpost ( . begin , log : log , name : " load toots into cache " , signpostID : cacheTaskSignpostID )
let workingIDRecord = APIService . Persist . WorkingIDRecord . workingID ( entities : toots )
// c o n t a i n s t o o t s a n d r e b l o g s
let _tootCache : [ Toot ] = {
let request = Toot . sortedFetchRequest
let idSet = workingIDRecord . statusIDSet
. union ( workingIDRecord . reblogIDSet )
let ids = Array ( idSet )
request . predicate = Toot . predicate ( domain : domain , ids : ids )
request . returnsObjectsAsFaults = false
request . relationshipKeyPathsForPrefetching = [ # keyPath ( Toot . reblog ) ]
do {
return try managedObjectContext . fetch ( request )
} catch {
assertionFailure ( error . localizedDescription )
return [ ]
}
} ( )
os_signpost ( . event , log : log , name : " load toots into cache " , signpostID : cacheTaskSignpostID , " cached %{public}ld toots " , _tootCache . count )
os_signpost ( . end , log : log , name : " load toots into cache " , signpostID : cacheTaskSignpostID )
// r e m o t e t i m e l i n e m e r g e l o c a l t i m e l i n e r e c o r d s e t
// d e c l a r e i t b e f o r e d o w o r k i n g
let mergedOldTootsInTimeline = _tootCache . filter {
return $0 . homeTimelineIndexes ? . contains ( where : { $0 . userID = = requestMastodonUserID } ) ? ? false
}
let updateDatabaseTaskSignpostID = OSSignpostID ( log : log )
let recordType : WorkingRecord . RecordType = {
switch persistType {
case . public : return . publicTimeline
case . home : return . homeTimeline
}
} ( )
var workingRecords : [ WorkingRecord ] = [ ]
os_signpost ( . begin , log : log , name : " update database " , signpostID : updateDatabaseTaskSignpostID )
for entity in toots {
let processEntityTaskSignpostID = OSSignpostID ( log : log )
os_signpost ( . begin , log : log , name : " update database - process entity " , signpostID : processEntityTaskSignpostID , " process entity %{public}s " , entity . id )
defer {
os_signpost ( . end , log : log , name : " update database - process entity " , signpostID : processEntityTaskSignpostID , " process entity %{public}s " , entity . id )
}
let record = WorkingRecord . createOrMergeToot (
into : managedObjectContext ,
for : requestMastodonUser ,
domain : domain ,
entity : entity ,
recordType : recordType ,
networkDate : response . networkDate ,
log : log
)
workingRecords . append ( record )
} // e n d f o r …
os_signpost ( . end , log : log , name : " update database " , signpostID : updateDatabaseTaskSignpostID )
// h o m e & m e n t i o n t i m e l i n e t a s k s
switch persistType {
case . home :
// T a s k 1 : u p d a t e a n c h o r h a s M o r e
// u p d a t e m a x I D a n c h o r h a s M o r e a t t r i b u t e w h e n f e t c h i n g o n t i m e l i n e
// d o n o t u s e w o r k i n g r e c o r d s d u e t o a n c h o r t o o t i s r e m o v a b l e o n t h e r e m o t e
var anchorToot : Toot ?
if let maxID = query . maxID {
do {
// l o a d a n c h o r t o o t f r o m d a t a b a s e
let request = Toot . sortedFetchRequest
request . predicate = Toot . predicate ( domain : domain , id : maxID )
request . returnsObjectsAsFaults = false
request . fetchLimit = 1
anchorToot = try managedObjectContext . fetch ( request ) . first
if persistType = = . home {
let timelineIndex = anchorToot . flatMap { toot in
toot . homeTimelineIndexes ? . first ( where : { $0 . userID = = requestMastodonUserID } )
}
timelineIndex ? . update ( hasMore : false )
} else {
assertionFailure ( )
}
} catch {
assertionFailure ( error . localizedDescription )
}
2021-02-03 06:43:57 +01:00
}
2021-02-02 07:10:25 +01:00
2021-02-04 07:45:44 +01:00
// T a s k 2 : s e t l a s t t o o t h a s M o r e w h e n f e t c h e d t o o t s n o t o v e r l a p w i t h t h e t i m e l i n e i n t h e l o c a l d a t a b a s e
let _oldestRecord = workingRecords
. sorted ( by : { $0 . status . createdAt < $1 . status . createdAt } )
. first
if let oldestRecord = _oldestRecord {
if let anchorToot = anchorToot {
// u s i n g a n c h o r . s e t h a s M o r e w h e n ( o v e r l a p i t s e l f O R n o o v e r l a p ) A N D o l d e s t r e c o r d N O T a n c h o r
let isNoOverlap = mergedOldTootsInTimeline . isEmpty
let isOnlyOverlapItself = mergedOldTootsInTimeline . count = = 1 && mergedOldTootsInTimeline . first ? . id = = anchorToot . id
let isAnchorEqualOldestRecord = oldestRecord . status . id = = anchorToot . id
if ( isNoOverlap || isOnlyOverlapItself ) && ! isAnchorEqualOldestRecord {
if persistType = = . home {
let timelineIndex = oldestRecord . status . homeTimelineIndexes ?
. first ( where : { $0 . userID = = requestMastodonUserID } )
timelineIndex ? . update ( hasMore : true )
} else {
assertionFailure ( )
}
}
} else if mergedOldTootsInTimeline . isEmpty {
// n o a n c h o r . s e t h a s M o r e w h e n n o o v e r l a p
if persistType = = . home {
let timelineIndex = oldestRecord . status . homeTimelineIndexes ?
. first ( where : { $0 . userID = = requestMastodonUserID } )
timelineIndex ? . update ( hasMore : true )
}
}
} else {
// e m p t y w o r k i n g r e c o r d . m a r k a n c h o r h a s M o r e i n t h e t a s k 1
}
default :
break
}
// p r i n t w o r k i n g r e c o r d t r e e m a p
#if DEBUG
DispatchQueue . global ( qos : . utility ) . async {
let logs = workingRecords
. map { record in record . log ( ) }
. joined ( separator : " \n " )
os_log ( . info , log : log , " %{public}s[%{public}ld], %{public}s: working status: \n %s " , ( ( #file as NSString ) . lastPathComponent ) , #line , #function , logs )
let counting = workingRecords
. map { record in record . count ( ) }
. reduce ( into : WorkingRecord . Counting ( ) , { result , next in result = result + next } )
let newTootsInTimeLineCount = workingRecords . reduce ( 0 , { result , next in
return next . statusProcessType = = . create ? result + 1 : result
2021-02-01 11:06:29 +01:00
} )
2021-02-04 07:45:44 +01:00
os_log ( . info , log : log , " %{public}s[%{public}ld], %{public}s: toot: insert %{public}ldT(%{public}ldTRQ), merge %{public}ldT(%{public}ldTRQ) " , ( ( #file as NSString ) . lastPathComponent ) , #line , #function , newTootsInTimeLineCount , counting . status . create , mergedOldTootsInTimeline . count , counting . status . merge )
os_log ( . info , log : log , " %{public}s[%{public}ld], %{public}s: mastodon user: insert %{public}ld, merge %{public}ld " , ( ( #file as NSString ) . lastPathComponent ) , #line , #function , counting . user . create , counting . user . merge )
2021-01-28 09:10:30 +01:00
}
2021-02-04 07:45:44 +01:00
#endif
2021-02-02 09:04:39 +01:00
}
2021-02-04 07:45:44 +01:00
. eraseToAnyPublisher ( )
2021-02-02 09:04:39 +01:00
. handleEvents ( receiveOutput : { result in
switch result {
case . success :
break
case . failure ( let error ) :
#if DEBUG
debugPrint ( error )
#endif
assertionFailure ( error . localizedDescription )
}
} )
. eraseToAnyPublisher ( )
2021-01-28 09:10:30 +01:00
}
}
2021-02-04 07:45:44 +01:00
extension APIService . Persist {
struct WorkingIDRecord {
var statusIDSet : Set < String >
var reblogIDSet : Set < String >
var userIDSet : Set < String >
enum RecordType {
case timeline
case reblog
}
init ( statusIDSet : Set < String > = Set ( ) , reblogIDSet : Set < String > = Set ( ) , userIDSet : Set < String > = Set ( ) ) {
self . statusIDSet = statusIDSet
self . reblogIDSet = reblogIDSet
self . userIDSet = userIDSet
}
mutating func union ( record : WorkingIDRecord ) {
statusIDSet = statusIDSet . union ( record . statusIDSet )
reblogIDSet = reblogIDSet . union ( record . reblogIDSet )
userIDSet = userIDSet . union ( record . userIDSet )
}
static func workingID ( entities : [ Mastodon . Entity . Status ] ) -> WorkingIDRecord {
var value = WorkingIDRecord ( )
for entity in entities {
let child = workingID ( entity : entity , recordType : . timeline )
value . union ( record : child )
}
return value
}
private static func workingID ( entity : Mastodon . Entity . Status , recordType : RecordType ) -> WorkingIDRecord {
var value = WorkingIDRecord ( )
switch recordType {
case . timeline : value . statusIDSet = Set ( [ entity . id ] )
case . reblog : value . reblogIDSet = Set ( [ entity . id ] )
}
value . userIDSet = Set ( [ entity . account . id ] )
if let reblog = entity . reblog {
let child = workingID ( entity : reblog , recordType : . reblog )
value . union ( record : child )
}
return value
}
}
class WorkingRecord {
let status : Toot
let children : [ WorkingRecord ]
let recordType : RecordType
let statusProcessType : ProcessType
let userProcessType : ProcessType
init (
status : Toot ,
children : [ APIService . Persist . WorkingRecord ] ,
recordType : APIService . Persist . WorkingRecord . RecordType ,
tootProcessType : ProcessType ,
userProcessType : ProcessType
) {
self . status = status
self . children = children
self . recordType = recordType
self . statusProcessType = tootProcessType
self . userProcessType = userProcessType
}
enum RecordType {
case publicTimeline
case homeTimeline
case mentionTimeline
case userTimeline
case favoriteTimeline
case searchTimeline
case reblog
var flag : String {
switch self {
case . publicTimeline : return " P "
case . homeTimeline : return " H "
case . mentionTimeline : return " M "
case . userTimeline : return " U "
case . favoriteTimeline : return " F "
case . searchTimeline : return " S "
case . reblog : return " R "
}
}
}
enum ProcessType {
case create
case merge
var flag : String {
switch self {
case . create : return " + "
case . merge : return " - "
}
}
}
func log ( indentLevel : Int = 0 ) -> String {
let indent = Array ( repeating : " " , count : indentLevel ) . joined ( )
let tootPreview = status . content . prefix ( 32 ) . replacingOccurrences ( of : " \n " , with : " " )
let message = " \( indent ) [ \( statusProcessType . flag ) \( recordType . flag ) ]( \( status . id ) ) [ \( userProcessType . flag ) ]( \( status . author . id ) )@ \( status . author . username ) ~> \( tootPreview ) "
var childrenMessages : [ String ] = [ ]
for child in children {
childrenMessages . append ( child . log ( indentLevel : indentLevel + 1 ) )
}
let result = [ [ message ] + childrenMessages ]
. flatMap { $0 }
. joined ( separator : " \n " )
return result
}
struct Counting {
var status = Counter ( )
var user = Counter ( )
static func + ( left : Counting , right : Counting ) -> Counting {
return Counting (
status : left . status + right . status ,
user : left . user + right . user
)
}
struct Counter {
var create = 0
var merge = 0
static func + ( left : Counter , right : Counter ) -> Counter {
return Counter (
create : left . create + right . create ,
merge : left . merge + right . merge
)
}
}
}
func count ( ) -> Counting {
var counting = Counting ( )
switch statusProcessType {
case . create : counting . status . create += 1
case . merge : counting . status . merge += 1
}
switch userProcessType {
case . create : counting . user . create += 1
case . merge : counting . user . merge += 1
}
for child in children {
let childCounting = child . count ( )
counting = counting + childCounting
}
return counting
}
// h a n d l e t i m e l i n e I n d e x i n s e r t w i t h A P I S e r v i c e . P e r s i s t . c r e a t e O r M e r g e T o o t
static func createOrMergeToot (
into managedObjectContext : NSManagedObjectContext ,
for requestMastodonUser : MastodonUser ? ,
domain : String ,
entity : Mastodon . Entity . Status ,
recordType : RecordType ,
networkDate : Date ,
log : OSLog
) -> WorkingRecord {
let processEntityTaskSignpostID = OSSignpostID ( log : log )
os_signpost ( . begin , log : log , name : " update database - process entity: createorMergeToot " , signpostID : processEntityTaskSignpostID , " process toot %{public}s " , entity . id )
defer {
os_signpost ( . end , log : log , name : " update database - process entity: createorMergeToot " , signpostID : processEntityTaskSignpostID , " finish process toot %{public}s " , entity . id )
}
// b u i l d t r e e
let reblogRecord : WorkingRecord ? = entity . reblog . flatMap { entity -> WorkingRecord in
createOrMergeToot ( into : managedObjectContext , for : requestMastodonUser , domain : domain , entity : entity , recordType : . reblog , networkDate : networkDate , log : log )
}
let children = [ reblogRecord ] . compactMap { $0 }
let ( status , isTootCreated , isTootUserCreated ) = APIService . CoreData . createOrMergeToot ( into : managedObjectContext , for : requestMastodonUser , entity : entity , domain : domain , networkDate : networkDate , log : log )
let result = WorkingRecord (
status : status ,
children : children ,
recordType : recordType ,
tootProcessType : isTootCreated ? . create : . merge ,
userProcessType : isTootUserCreated ? . create : . merge
)
switch ( result . statusProcessType , recordType ) {
case ( . create , . homeTimeline ) , ( . merge , . homeTimeline ) :
guard let requestMastodonUserID = requestMastodonUser ? . id else {
assertionFailure ( " Request user is required for home timeline " )
break
}
let timelineIndex = status . homeTimelineIndexes ?
. first { $0 . userID = = requestMastodonUserID }
if timelineIndex = = nil {
2021-02-07 07:42:50 +01:00
let timelineIndexProperty = HomeTimelineIndex . Property ( domain : domain , userID : requestMastodonUserID )
2021-02-04 07:45:44 +01:00
let _ = HomeTimelineIndex . insert (
into : managedObjectContext ,
property : timelineIndexProperty ,
toot : status
)
} else {
// e n i t y a l r e a d y i n h o m e t i m e l i n e
}
default :
break
}
return result
}
}
}