test_file.py 193 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the file state
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import errno
  7. import filecmp
  8. import logging
  9. import os
  10. import re
  11. import shutil
  12. import stat
  13. import sys
  14. import tempfile
  15. import textwrap
  16. import pytest
  17. import salt.serializers.configparser
  18. import salt.utils.data
  19. import salt.utils.files
  20. import salt.utils.json
  21. import salt.utils.path
  22. import salt.utils.platform
  23. import salt.utils.stringutils
  24. from salt.ext import six
  25. from salt.ext.six.moves import range
  26. from salt.utils.versions import LooseVersion as _LooseVersion
  27. from tests.support.case import ModuleCase
  28. from tests.support.helpers import (
  29. Webserver,
  30. dedent,
  31. with_system_user_and_group,
  32. with_tempdir,
  33. with_tempfile,
  34. )
  35. from tests.support.mixins import SaltReturnAssertsMixin
  36. from tests.support.runtests import RUNTIME_VARS
  37. from tests.support.unit import skipIf
  38. log = logging.getLogger(__name__)
  39. HAS_PWD = True
  40. try:
  41. import pwd
  42. except ImportError:
  43. HAS_PWD = False
  44. HAS_GRP = True
  45. try:
  46. import grp
  47. except ImportError:
  48. HAS_GRP = False
  49. IS_WINDOWS = salt.utils.platform.is_windows()
  50. BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
  51. TEST_SYSTEM_USER = "test_system_user"
  52. TEST_SYSTEM_GROUP = "test_system_group"
  53. def _test_managed_file_mode_keep_helper(testcase, local=False):
  54. """
  55. DRY helper function to run the same test with a local or remote path
  56. """
  57. name = os.path.join(RUNTIME_VARS.TMP, "scene33")
  58. grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  59. grail = "salt://grail/scene33" if not local else grail_fs_path
  60. # Get the current mode so that we can put the file back the way we
  61. # found it when we're done.
  62. grail_fs_mode = int(testcase.run_function("file.get_mode", [grail_fs_path]), 8)
  63. initial_mode = 0o770
  64. new_mode_1 = 0o600
  65. new_mode_2 = 0o644
  66. # Set the initial mode, so we can be assured that when we set the mode
  67. # to "keep", we're actually changing the permissions of the file to the
  68. # new mode.
  69. ret = testcase.run_state(
  70. "file.managed", name=name, mode=oct(initial_mode), source=grail,
  71. )
  72. if IS_WINDOWS:
  73. testcase.assertSaltFalseReturn(ret)
  74. return
  75. testcase.assertSaltTrueReturn(ret)
  76. try:
  77. # Update the mode on the fileserver (pass 1)
  78. os.chmod(grail_fs_path, new_mode_1)
  79. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  80. testcase.assertSaltTrueReturn(ret)
  81. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  82. testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
  83. # Update the mode on the fileserver (pass 2)
  84. # This assures us that if the file in file_roots was originally set
  85. # to the same mode as new_mode_1, we definitely get an updated mode
  86. # this time.
  87. os.chmod(grail_fs_path, new_mode_2)
  88. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  89. testcase.assertSaltTrueReturn(ret)
  90. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  91. testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
  92. finally:
  93. # Set the mode of the file in the file_roots back to what it
  94. # originally was.
  95. os.chmod(grail_fs_path, grail_fs_mode)
  96. @pytest.mark.windows_whitelisted
  97. class FileTest(ModuleCase, SaltReturnAssertsMixin):
  98. """
  99. Validate the file state
  100. """
  101. def _delete_file(self, path):
  102. try:
  103. os.remove(path)
  104. except OSError as exc:
  105. if exc.errno != errno.ENOENT:
  106. log.error("Failed to remove %s: %s", path, exc)
  107. def tearDown(self):
  108. """
  109. remove files created in previous tests
  110. """
  111. user = "salt"
  112. if user in str(self.run_function("user.list_users")):
  113. self.run_function("user.delete", [user])
  114. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  115. def test_symlink(self):
  116. """
  117. file.symlink
  118. """
  119. name = os.path.join(RUNTIME_VARS.TMP, "symlink")
  120. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  121. # Windows must have a source directory to link to
  122. if IS_WINDOWS and not os.path.isdir(tgt):
  123. os.mkdir(tgt)
  124. # Windows cannot create a symlink if it already exists
  125. if IS_WINDOWS and self.run_function("file.is_link", [name]):
  126. self.run_function("file.remove", [name])
  127. ret = self.run_state("file.symlink", name=name, target=tgt)
  128. self.assertSaltTrueReturn(ret)
  129. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  130. def test_test_symlink(self):
  131. """
  132. file.symlink test interface
  133. """
  134. name = os.path.join(RUNTIME_VARS.TMP, "symlink2")
  135. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  136. ret = self.run_state("file.symlink", test=True, name=name, target=tgt)
  137. self.assertSaltNoneReturn(ret)
  138. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  139. def test_absent_file(self):
  140. """
  141. file.absent
  142. """
  143. name = os.path.join(RUNTIME_VARS.TMP, "file_to_kill")
  144. with salt.utils.files.fopen(name, "w+") as fp_:
  145. fp_.write("killme")
  146. ret = self.run_state("file.absent", name=name)
  147. self.assertSaltTrueReturn(ret)
  148. self.assertFalse(os.path.isfile(name))
  149. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  150. def test_absent_dir(self):
  151. """
  152. file.absent
  153. """
  154. name = os.path.join(RUNTIME_VARS.TMP, "dir_to_kill")
  155. if not os.path.isdir(name):
  156. # left behind... Don't fail because of this!
  157. os.makedirs(name)
  158. ret = self.run_state("file.absent", name=name)
  159. self.assertSaltTrueReturn(ret)
  160. self.assertFalse(os.path.isdir(name))
  161. @pytest.mark.slow_test(seconds=120) # Test takes >60 and <=120 seconds
  162. def test_absent_link(self):
  163. """
  164. file.absent
  165. """
  166. name = os.path.join(RUNTIME_VARS.TMP, "link_to_kill")
  167. tgt = "{0}.tgt".format(name)
  168. # Windows must have a source directory to link to
  169. if IS_WINDOWS and not os.path.isdir(tgt):
  170. os.mkdir(tgt)
  171. if not self.run_function("file.is_link", [name]):
  172. self.run_function("file.symlink", [tgt, name])
  173. ret = self.run_state("file.absent", name=name)
  174. try:
  175. self.assertSaltTrueReturn(ret)
  176. self.assertFalse(self.run_function("file.is_link", [name]))
  177. finally:
  178. if self.run_function("file.is_link", [name]):
  179. self.run_function("file.remove", [name])
  180. @with_tempfile()
  181. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  182. def test_test_absent(self, name):
  183. """
  184. file.absent test interface
  185. """
  186. with salt.utils.files.fopen(name, "w+") as fp_:
  187. fp_.write("killme")
  188. ret = self.run_state("file.absent", test=True, name=name)
  189. self.assertSaltNoneReturn(ret)
  190. self.assertTrue(os.path.isfile(name))
  191. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  192. def test_managed(self):
  193. """
  194. file.managed
  195. """
  196. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  197. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33")
  198. src = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  199. with salt.utils.files.fopen(src, "r") as fp_:
  200. master_data = fp_.read()
  201. with salt.utils.files.fopen(name, "r") as fp_:
  202. minion_data = fp_.read()
  203. self.assertEqual(master_data, minion_data)
  204. self.assertSaltTrueReturn(ret)
  205. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  206. def test_managed_file_mode(self):
  207. """
  208. file.managed, correct file permissions
  209. """
  210. desired_mode = 504 # 0770 octal
  211. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  212. ret = self.run_state(
  213. "file.managed", name=name, mode="0770", source="salt://grail/scene33"
  214. )
  215. if IS_WINDOWS:
  216. expected = "The 'mode' option is not supported on Windows"
  217. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  218. self.assertSaltFalseReturn(ret)
  219. return
  220. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  221. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  222. self.assertSaltTrueReturn(ret)
  223. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  224. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  225. def test_managed_file_mode_keep(self):
  226. """
  227. Test using "mode: keep" in a file.managed state
  228. """
  229. _test_managed_file_mode_keep_helper(self, local=False)
  230. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  231. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  232. def test_managed_file_mode_keep_local_source(self):
  233. """
  234. Test using "mode: keep" in a file.managed state, with a local file path
  235. as the source.
  236. """
  237. _test_managed_file_mode_keep_helper(self, local=True)
  238. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  239. def test_managed_file_mode_file_exists_replace(self):
  240. """
  241. file.managed, existing file with replace=True, change permissions
  242. """
  243. initial_mode = 504 # 0770 octal
  244. desired_mode = 384 # 0600 octal
  245. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  246. ret = self.run_state(
  247. "file.managed",
  248. name=name,
  249. mode=oct(initial_mode),
  250. source="salt://grail/scene33",
  251. )
  252. if IS_WINDOWS:
  253. expected = "The 'mode' option is not supported on Windows"
  254. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  255. self.assertSaltFalseReturn(ret)
  256. return
  257. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  258. self.assertEqual(oct(initial_mode), oct(resulting_mode))
  259. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  260. ret = self.run_state(
  261. "file.managed",
  262. name=name,
  263. replace=True,
  264. mode=oct(desired_mode),
  265. source="salt://grail/scene33",
  266. )
  267. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  268. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  269. self.assertSaltTrueReturn(ret)
  270. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  271. def test_managed_file_mode_file_exists_noreplace(self):
  272. """
  273. file.managed, existing file with replace=False, change permissions
  274. """
  275. initial_mode = 504 # 0770 octal
  276. desired_mode = 384 # 0600 octal
  277. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  278. ret = self.run_state(
  279. "file.managed",
  280. name=name,
  281. replace=True,
  282. mode=oct(initial_mode),
  283. source="salt://grail/scene33",
  284. )
  285. if IS_WINDOWS:
  286. expected = "The 'mode' option is not supported on Windows"
  287. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  288. self.assertSaltFalseReturn(ret)
  289. return
  290. ret = self.run_state(
  291. "file.managed",
  292. name=name,
  293. replace=False,
  294. mode=oct(desired_mode),
  295. source="salt://grail/scene33",
  296. )
  297. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  298. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  299. self.assertSaltTrueReturn(ret)
  300. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  301. def test_managed_file_with_grains_data(self):
  302. """
  303. Test to ensure we can render grains data into a managed
  304. file.
  305. """
  306. grain_path = os.path.join(RUNTIME_VARS.TMP, "file-grain-test")
  307. state_file = "file-grainget"
  308. self.run_function("state.sls", [state_file], pillar={"grain_path": grain_path})
  309. self.assertTrue(os.path.exists(grain_path))
  310. with salt.utils.files.fopen(grain_path, "r") as fp_:
  311. file_contents = fp_.readlines()
  312. if IS_WINDOWS:
  313. match = "^minion\r\n"
  314. else:
  315. match = "^minion\n"
  316. self.assertTrue(re.match(match, file_contents[0]))
  317. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  318. def test_managed_file_with_pillar_sls(self):
  319. """
  320. Test to ensure pillar data in sls file
  321. is rendered properly and file is created.
  322. """
  323. file_pillar = os.path.join(RUNTIME_VARS.TMP, "filepillar-python")
  324. self.addCleanup(self._delete_file, file_pillar)
  325. state_name = "file-pillarget"
  326. log.warning("File Path: %s", file_pillar)
  327. ret = self.run_function("state.sls", [state_name])
  328. self.assertSaltTrueReturn(ret)
  329. # Check to make sure the file was created
  330. check_file = self.run_function("file.file_exists", [file_pillar])
  331. self.assertTrue(check_file)
  332. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  333. def test_managed_file_with_pillardefault_sls(self):
  334. """
  335. Test to ensure when pillar data is not available
  336. in sls file with pillar.get it uses the default
  337. value.
  338. """
  339. file_pillar_def = os.path.join(RUNTIME_VARS.TMP, "filepillar-defaultvalue")
  340. self.addCleanup(self._delete_file, file_pillar_def)
  341. state_name = "file-pillardefaultget"
  342. log.warning("File Path: %s", file_pillar_def)
  343. ret = self.run_function("state.sls", [state_name])
  344. self.assertSaltTrueReturn(ret)
  345. # Check to make sure the file was created
  346. check_file = self.run_function("file.file_exists", [file_pillar_def])
  347. self.assertTrue(check_file)
  348. @pytest.mark.skip_if_not_root
  349. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  350. def test_managed_dir_mode(self):
  351. """
  352. Tests to ensure that file.managed creates directories with the
  353. permissions requested with the dir_mode argument
  354. """
  355. desired_mode = 511 # 0777 in octal
  356. name = os.path.join(RUNTIME_VARS.TMP, "a", "managed_dir_mode_test_file")
  357. desired_owner = "nobody"
  358. ret = self.run_state(
  359. "file.managed",
  360. name=name,
  361. source="salt://grail/scene33",
  362. mode=600,
  363. makedirs=True,
  364. user=desired_owner,
  365. dir_mode=oct(desired_mode), # 0777
  366. )
  367. if IS_WINDOWS:
  368. expected = "The 'mode' option is not supported on Windows"
  369. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  370. self.assertSaltFalseReturn(ret)
  371. return
  372. resulting_mode = stat.S_IMODE(
  373. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_mode
  374. )
  375. resulting_owner = pwd.getpwuid(
  376. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_uid
  377. ).pw_name
  378. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  379. self.assertSaltTrueReturn(ret)
  380. self.assertEqual(desired_owner, resulting_owner)
  381. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  382. def test_test_managed(self):
  383. """
  384. file.managed test interface
  385. """
  386. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_not_scene33")
  387. ret = self.run_state(
  388. "file.managed", test=True, name=name, source="salt://grail/scene33"
  389. )
  390. self.assertSaltNoneReturn(ret)
  391. self.assertFalse(os.path.isfile(name))
  392. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  393. def test_managed_show_changes_false(self):
  394. """
  395. file.managed test interface
  396. """
  397. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  398. with salt.utils.files.fopen(name, "wb") as fp_:
  399. fp_.write(b"test_managed_show_changes_false\n")
  400. ret = self.run_state(
  401. "file.managed", name=name, source="salt://grail/scene33", show_changes=False
  402. )
  403. changes = next(six.itervalues(ret))["changes"]
  404. self.assertEqual("<show_changes=False>", changes["diff"])
  405. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  406. def test_managed_show_changes_true(self):
  407. """
  408. file.managed test interface
  409. """
  410. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  411. with salt.utils.files.fopen(name, "wb") as fp_:
  412. fp_.write(b"test_managed_show_changes_false\n")
  413. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33",)
  414. changes = next(six.itervalues(ret))["changes"]
  415. self.assertIn("diff", changes)
  416. @skipIf(IS_WINDOWS, "Don't know how to fix for Windows")
  417. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  418. def test_managed_escaped_file_path(self):
  419. """
  420. file.managed test that 'salt://|' protects unusual characters in file path
  421. """
  422. funny_file = salt.utils.files.mkstemp(
  423. prefix="?f!le? n@=3&", suffix=".file type"
  424. )
  425. funny_file_name = os.path.split(funny_file)[1]
  426. funny_url = "salt://|" + funny_file_name
  427. funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
  428. state_name = "funny_file"
  429. state_file_name = state_name + ".sls"
  430. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
  431. state_key = "file_|-{0}_|-{0}_|-managed".format(funny_file)
  432. self.addCleanup(os.remove, state_file)
  433. self.addCleanup(os.remove, funny_file)
  434. self.addCleanup(os.remove, funny_url_path)
  435. with salt.utils.files.fopen(funny_url_path, "w"):
  436. pass
  437. with salt.utils.files.fopen(state_file, "w") as fp_:
  438. fp_.write(
  439. textwrap.dedent(
  440. """\
  441. {0}:
  442. file.managed:
  443. - source: {1}
  444. - makedirs: True
  445. """.format(
  446. funny_file, funny_url
  447. )
  448. )
  449. )
  450. ret = self.run_function("state.sls", [state_name])
  451. self.assertTrue(ret[state_key]["result"])
  452. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  453. def test_managed_contents(self):
  454. """
  455. test file.managed with contents that is a boolean, string, integer,
  456. float, list, and dictionary
  457. """
  458. state_name = "file-FileTest-test_managed_contents"
  459. state_filename = state_name + ".sls"
  460. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  461. managed_files = {}
  462. state_keys = {}
  463. for typ in ("bool", "str", "int", "float", "list", "dict"):
  464. managed_files[typ] = salt.utils.files.mkstemp()
  465. state_keys[typ] = "file_|-{0} file_|-{1}_|-managed".format(
  466. typ, managed_files[typ]
  467. )
  468. try:
  469. with salt.utils.files.fopen(state_file, "w") as fd_:
  470. fd_.write(
  471. textwrap.dedent(
  472. """\
  473. bool file:
  474. file.managed:
  475. - name: {bool}
  476. - contents: True
  477. str file:
  478. file.managed:
  479. - name: {str}
  480. - contents: Salt was here.
  481. int file:
  482. file.managed:
  483. - name: {int}
  484. - contents: 340282366920938463463374607431768211456
  485. float file:
  486. file.managed:
  487. - name: {float}
  488. - contents: 1.7518e-45 # gravitational coupling constant
  489. list file:
  490. file.managed:
  491. - name: {list}
  492. - contents: [1, 1, 2, 3, 5, 8, 13]
  493. dict file:
  494. file.managed:
  495. - name: {dict}
  496. - contents:
  497. C: charge
  498. P: parity
  499. T: time
  500. """.format(
  501. **managed_files
  502. )
  503. )
  504. )
  505. ret = self.run_function("state.sls", [state_name])
  506. self.assertSaltTrueReturn(ret)
  507. for typ in state_keys:
  508. self.assertTrue(ret[state_keys[typ]]["result"])
  509. self.assertIn("diff", ret[state_keys[typ]]["changes"])
  510. finally:
  511. if os.path.exists(state_file):
  512. os.remove(state_file)
  513. for typ in managed_files:
  514. if os.path.exists(managed_files[typ]):
  515. os.remove(managed_files[typ])
  516. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  517. def test_managed_contents_with_contents_newline(self):
  518. """
  519. test file.managed with contents by using the default content_newline
  520. flag.
  521. """
  522. contents = "test_managed_contents_with_newline_one"
  523. name = os.path.join(RUNTIME_VARS.TMP, "foo")
  524. # Create a file named foo with contents as above but with a \n at EOF
  525. self.run_state(
  526. "file.managed", name=name, contents=contents, contents_newline=True
  527. )
  528. with salt.utils.files.fopen(name, "r") as fp_:
  529. last_line = fp_.read()
  530. self.assertEqual((contents + os.linesep), last_line)
  531. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  532. def test_managed_contents_with_contents_newline_false(self):
  533. """
  534. test file.managed with contents by using the non default content_newline
  535. flag.
  536. """
  537. contents = "test_managed_contents_with_newline_one"
  538. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  539. # Create a file named foo with contents as above but with a \n at EOF
  540. self.run_state(
  541. "file.managed", name=name, contents=contents, contents_newline=False
  542. )
  543. with salt.utils.files.fopen(name, "r") as fp_:
  544. last_line = fp_.read()
  545. self.assertEqual(contents, last_line)
  546. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  547. def test_managed_multiline_contents_with_contents_newline(self):
  548. """
  549. test file.managed with contents by using the non default content_newline
  550. flag.
  551. """
  552. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  553. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  554. # Create a file named foo with contents as above but with a \n at EOF
  555. self.run_state(
  556. "file.managed", name=name, contents=contents, contents_newline=True
  557. )
  558. with salt.utils.files.fopen(name, "r") as fp_:
  559. last_line = fp_.read()
  560. self.assertEqual((contents + os.linesep), last_line)
  561. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  562. def test_managed_multiline_contents_with_contents_newline_false(self):
  563. """
  564. test file.managed with contents by using the non default content_newline
  565. flag.
  566. """
  567. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  568. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  569. # Create a file named foo with contents as above but with a \n at EOF
  570. self.run_state(
  571. "file.managed", name=name, contents=contents, contents_newline=False
  572. )
  573. with salt.utils.files.fopen(name, "r") as fp_:
  574. last_line = fp_.read()
  575. self.assertEqual(contents, last_line)
  576. @pytest.mark.skip_if_not_root
  577. @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
  578. @skipIf(not salt.utils.path.which("visudo"), "sudo is missing")
  579. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  580. def test_managed_check_cmd(self):
  581. """
  582. Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
  583. """
  584. r_group = "root"
  585. if salt.utils.platform.is_darwin():
  586. r_group = "wheel"
  587. try:
  588. ret = self.run_state(
  589. "file.managed",
  590. name="/tmp/sudoers",
  591. user="root",
  592. group=r_group,
  593. mode=440,
  594. check_cmd="visudo -c -s -f",
  595. )
  596. self.assertSaltTrueReturn(ret)
  597. self.assertInSaltComment("Empty file", ret)
  598. self.assertEqual(
  599. ret["file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed"]["changes"],
  600. {"new": "file /tmp/sudoers created", "mode": "0440"},
  601. )
  602. finally:
  603. # Clean Up File
  604. if os.path.exists("/tmp/sudoers"):
  605. os.remove("/tmp/sudoers")
  606. @pytest.mark.slow_test(seconds=120) # Test takes >60 and <=120 seconds
  607. def test_managed_local_source_with_source_hash(self):
  608. """
  609. Make sure that we enforce the source_hash even with local files
  610. """
  611. name = os.path.join(RUNTIME_VARS.TMP, "local_source_with_source_hash")
  612. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  613. actual_hash = "567fd840bf1548edc35c48eb66cdd78bfdfcccff"
  614. if IS_WINDOWS:
  615. # CRLF vs LF causes a different hash on windows
  616. actual_hash = "f658a0ec121d9c17088795afcc6ff3c43cb9842a"
  617. # Reverse the actual hash
  618. bad_hash = actual_hash[::-1]
  619. def remove_file():
  620. try:
  621. os.remove(name)
  622. except OSError as exc:
  623. if exc.errno != errno.ENOENT:
  624. raise
  625. def do_test(clean=False):
  626. for proto in ("file://", ""):
  627. source = proto + local_path
  628. log.debug("Trying source %s", source)
  629. try:
  630. ret = self.run_state(
  631. "file.managed",
  632. name=name,
  633. source=source,
  634. source_hash="sha1={0}".format(bad_hash),
  635. )
  636. self.assertSaltFalseReturn(ret)
  637. ret = ret[next(iter(ret))]
  638. # Shouldn't be any changes
  639. self.assertFalse(ret["changes"])
  640. # Check that we identified a hash mismatch
  641. self.assertIn("does not match actual checksum", ret["comment"])
  642. ret = self.run_state(
  643. "file.managed",
  644. name=name,
  645. source=source,
  646. source_hash="sha1={0}".format(actual_hash),
  647. )
  648. self.assertSaltTrueReturn(ret)
  649. finally:
  650. if clean:
  651. remove_file()
  652. remove_file()
  653. log.debug("Trying with nonexistant destination file")
  654. do_test()
  655. log.debug("Trying with destination file already present")
  656. with salt.utils.files.fopen(name, "w"):
  657. pass
  658. try:
  659. do_test(clean=False)
  660. finally:
  661. remove_file()
  662. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  663. def test_managed_local_source_does_not_exist(self):
  664. """
  665. Make sure that we exit gracefully when a local source doesn't exist
  666. """
  667. name = os.path.join(RUNTIME_VARS.TMP, "local_source_does_not_exist")
  668. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene99")
  669. for proto in ("file://", ""):
  670. source = proto + local_path
  671. log.debug("Trying source %s", source)
  672. ret = self.run_state("file.managed", name=name, source=source)
  673. self.assertSaltFalseReturn(ret)
  674. ret = ret[next(iter(ret))]
  675. # Shouldn't be any changes
  676. self.assertFalse(ret["changes"])
  677. # Check that we identified a hash mismatch
  678. self.assertIn("does not exist", ret["comment"])
  679. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  680. def test_managed_unicode_jinja_with_tojson_filter(self):
  681. """
  682. Using {{ varname }} with a list or dictionary which contains unicode
  683. types on Python 2 will result in Jinja rendering the "u" prefix on each
  684. string. This tests that using the "tojson" jinja filter will dump them
  685. to a format which can be successfully loaded by our YAML loader.
  686. The two lines that should end up being rendered are meant to test two
  687. issues that would trip up PyYAML if the "tojson" filter were not used:
  688. 1. A unicode string type would be loaded as a unicode literal with the
  689. leading "u" as well as the quotes, rather than simply being loaded
  690. as the proper unicode type which matches the content of the string
  691. literal. In other words, u'foo' would be loaded literally as
  692. u"u'foo'". This test includes actual non-ascii unicode in one of the
  693. strings to confirm that this also handles these international
  694. characters properly.
  695. 2. Any unicode string type (such as a URL) which contains a colon would
  696. cause a ScannerError in PyYAML, as it would be assumed to delimit a
  697. mapping node.
  698. Dumping the data structure to JSON using the "tojson" jinja filter
  699. should produce an inline data structure which is valid YAML and will be
  700. loaded properly by our YAML loader.
  701. """
  702. test_file = os.path.join(RUNTIME_VARS.TMP, "test-tojson.txt")
  703. ret = self.run_function(
  704. "state.apply", mods="tojson", pillar={"tojson-file": test_file}
  705. )
  706. ret = ret[next(iter(ret))]
  707. assert ret["result"], ret
  708. with salt.utils.files.fopen(test_file, mode="rb") as fp_:
  709. managed = salt.utils.stringutils.to_unicode(fp_.read())
  710. expected = dedent(
  711. """\
  712. Die Webseite ist https://saltstack.com.
  713. Der Zucker ist süß.
  714. """
  715. )
  716. assert managed == expected, "{0!r} != {1!r}".format(managed, expected)
  717. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  718. def test_managed_source_hash_indifferent_case(self):
  719. """
  720. Test passing a source_hash as an uppercase hash.
  721. This is a regression test for Issue #38914 and Issue #48230 (test=true use).
  722. """
  723. name = os.path.join(RUNTIME_VARS.TMP, "source_hash_indifferent_case")
  724. state_name = "file_|-{0}_|" "-{0}_|-managed".format(name)
  725. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "hello_world.txt")
  726. actual_hash = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
  727. if IS_WINDOWS:
  728. # CRLF vs LF causes a differnt hash on windows
  729. actual_hash = (
  730. "92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925"
  731. )
  732. uppercase_hash = actual_hash.upper()
  733. try:
  734. # Lay down tmp file to test against
  735. self.run_state(
  736. "file.managed", name=name, source=local_path, source_hash=actual_hash
  737. )
  738. # Test uppercase source_hash: should return True with no changes
  739. ret = self.run_state(
  740. "file.managed", name=name, source=local_path, source_hash=uppercase_hash
  741. )
  742. assert ret[state_name]["result"] is True
  743. assert ret[state_name]["changes"] == {}
  744. # Test uppercase source_hash using test=true
  745. # Should return True with no changes
  746. ret = self.run_state(
  747. "file.managed",
  748. name=name,
  749. source=local_path,
  750. source_hash=uppercase_hash,
  751. test=True,
  752. )
  753. assert ret[state_name]["result"] is True
  754. assert ret[state_name]["changes"] == {}
  755. finally:
  756. # Clean Up File
  757. if os.path.exists(name):
  758. os.remove(name)
  759. @with_tempfile(create=False)
  760. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  761. def test_managed_latin1_diff(self, name):
  762. """
  763. Tests that latin-1 file contents are represented properly in the diff
  764. """
  765. # Lay down the initial file
  766. ret = self.run_state(
  767. "file.managed", name=name, source="salt://issue-48777/old.html"
  768. )
  769. ret = ret[next(iter(ret))]
  770. assert ret["result"] is True, ret
  771. # Replace it with the new file and check the diff
  772. ret = self.run_state(
  773. "file.managed", name=name, source="salt://issue-48777/new.html"
  774. )
  775. ret = ret[next(iter(ret))]
  776. assert ret["result"] is True, ret
  777. diff_lines = ret["changes"]["diff"].split(os.linesep)
  778. assert "+räksmörgås" in diff_lines, diff_lines
  779. @with_tempfile()
  780. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  781. def test_managed_keep_source_false_salt(self, name):
  782. """
  783. This test ensures that we properly clean the cached file if keep_source
  784. is set to False, for source files using a salt:// URL
  785. """
  786. source = "salt://grail/scene33"
  787. saltenv = "base"
  788. # Run the state
  789. ret = self.run_state(
  790. "file.managed", name=name, source=source, saltenv=saltenv, keep_source=False
  791. )
  792. ret = ret[next(iter(ret))]
  793. assert ret["result"] is True
  794. # Now make sure that the file is not cached
  795. result = self.run_function("cp.is_cached", [source, saltenv])
  796. assert result == "", "File is still cached at {0}".format(result)
  797. @with_tempfile(create=False)
  798. @with_tempfile(create=False)
  799. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  800. def test_file_managed_onchanges(self, file1, file2):
  801. """
  802. Test file.managed state with onchanges
  803. """
  804. pillar = {
  805. "file1": file1,
  806. "file2": file2,
  807. "source": "salt://testfile",
  808. "req": "onchanges",
  809. }
  810. # Lay down the file used in the below SLS to ensure that when it is
  811. # run, there are no changes.
  812. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  813. ret = self.repack_state_returns(
  814. self.run_function(
  815. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  816. )
  817. )
  818. # The file states should both exit with None
  819. assert ret["one"]["result"] is None, ret["one"]["result"]
  820. assert ret["three"]["result"] is True, ret["three"]["result"]
  821. # The first file state should have changes, since a new file was
  822. # created. The other one should not, since we already created that file
  823. # before applying the SLS file.
  824. assert ret["one"]["changes"]
  825. assert not ret["three"]["changes"], ret["three"]["changes"]
  826. # The state watching 'one' should have been run due to changes
  827. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  828. # The state watching 'three' should not have been run
  829. assert (
  830. ret["four"]["comment"]
  831. == "State was not run because none of the onchanges reqs changed"
  832. ), ret["four"]["comment"]
  833. @with_tempfile(create=False)
  834. @with_tempfile(create=False)
  835. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  836. def test_file_managed_prereq(self, file1, file2):
  837. """
  838. Test file.managed state with prereq
  839. """
  840. pillar = {
  841. "file1": file1,
  842. "file2": file2,
  843. "source": "salt://testfile",
  844. "req": "prereq",
  845. }
  846. # Lay down the file used in the below SLS to ensure that when it is
  847. # run, there are no changes.
  848. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  849. ret = self.repack_state_returns(
  850. self.run_function(
  851. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  852. )
  853. )
  854. # The file states should both exit with None
  855. assert ret["one"]["result"] is None, ret["one"]["result"]
  856. assert ret["three"]["result"] is True, ret["three"]["result"]
  857. # The first file state should have changes, since a new file was
  858. # created. The other one should not, since we already created that file
  859. # before applying the SLS file.
  860. assert ret["one"]["changes"]
  861. assert not ret["three"]["changes"], ret["three"]["changes"]
  862. # The state watching 'one' should have been run due to changes
  863. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  864. # The state watching 'three' should not have been run
  865. assert ret["four"]["comment"] == "No changes detected", ret["four"]["comment"]
  866. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  867. def test_directory(self):
  868. """
  869. file.directory
  870. """
  871. name = os.path.join(RUNTIME_VARS.TMP, "a_new_dir")
  872. ret = self.run_state("file.directory", name=name)
  873. self.assertSaltTrueReturn(ret)
  874. self.assertTrue(os.path.isdir(name))
  875. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  876. def test_directory_symlink_dry_run(self):
  877. """
  878. Ensure that symlinks are followed when file.directory is run with
  879. test=True
  880. """
  881. try:
  882. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "pgdata")
  883. sym_dir = os.path.join(RUNTIME_VARS.TMP, "pg_data")
  884. os.mkdir(tmp_dir, 0o700)
  885. self.run_function("file.symlink", [tmp_dir, sym_dir])
  886. if IS_WINDOWS:
  887. ret = self.run_state(
  888. "file.directory",
  889. test=True,
  890. name=sym_dir,
  891. follow_symlinks=True,
  892. win_owner="Administrators",
  893. )
  894. else:
  895. ret = self.run_state(
  896. "file.directory",
  897. test=True,
  898. name=sym_dir,
  899. follow_symlinks=True,
  900. mode=700,
  901. )
  902. self.assertSaltTrueReturn(ret)
  903. finally:
  904. if os.path.isdir(tmp_dir):
  905. self.run_function("file.remove", [tmp_dir])
  906. if os.path.islink(sym_dir):
  907. self.run_function("file.remove", [sym_dir])
  908. @pytest.mark.skip_if_not_root
  909. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  910. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  911. def test_directory_max_depth(self):
  912. """
  913. file.directory
  914. Test the max_depth option by iteratively increasing the depth and
  915. checking that no changes deeper than max_depth have been attempted
  916. """
  917. def _get_oct_mode(name):
  918. """
  919. Return a string octal representation of the permissions for name
  920. """
  921. return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
  922. top = os.path.join(RUNTIME_VARS.TMP, "top_dir")
  923. sub = os.path.join(top, "sub_dir")
  924. subsub = os.path.join(sub, "sub_sub_dir")
  925. dirs = [top, sub, subsub]
  926. initial_mode = "0111"
  927. changed_mode = "0555"
  928. initial_modes = {
  929. 0: {sub: "0755", subsub: "0111"},
  930. 1: {sub: "0111", subsub: "0111"},
  931. 2: {sub: "0111", subsub: "0111"},
  932. }
  933. if not os.path.isdir(subsub):
  934. os.makedirs(subsub, int(initial_mode, 8))
  935. try:
  936. for depth in range(0, 3):
  937. ret = self.run_state(
  938. "file.directory",
  939. name=top,
  940. max_depth=depth,
  941. dir_mode=changed_mode,
  942. recurse=["mode"],
  943. )
  944. self.assertSaltTrueReturn(ret)
  945. for changed_dir in dirs[0 : depth + 1]:
  946. self.assertEqual(changed_mode, _get_oct_mode(changed_dir))
  947. for untouched_dir in dirs[depth + 1 :]:
  948. # Beginning in Python 3.7, os.makedirs no longer sets
  949. # the mode of intermediate directories to the mode that
  950. # is passed.
  951. if sys.version_info >= (3, 7):
  952. _mode = initial_modes[depth][untouched_dir]
  953. self.assertEqual(_mode, _get_oct_mode(untouched_dir))
  954. else:
  955. self.assertEqual(initial_mode, _get_oct_mode(untouched_dir))
  956. finally:
  957. shutil.rmtree(top)
  958. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  959. def test_test_directory(self):
  960. """
  961. file.directory
  962. """
  963. name = os.path.join(RUNTIME_VARS.TMP, "a_not_dir")
  964. ret = self.run_state("file.directory", test=True, name=name)
  965. self.assertSaltNoneReturn(ret)
  966. self.assertFalse(os.path.isdir(name))
  967. @with_tempdir()
  968. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  969. def test_directory_clean(self, base_dir):
  970. """
  971. file.directory with clean=True
  972. """
  973. name = os.path.join(base_dir, "directory_clean_dir")
  974. os.mkdir(name)
  975. strayfile = os.path.join(name, "strayfile")
  976. with salt.utils.files.fopen(strayfile, "w"):
  977. pass
  978. straydir = os.path.join(name, "straydir")
  979. if not os.path.isdir(straydir):
  980. os.makedirs(straydir)
  981. with salt.utils.files.fopen(os.path.join(straydir, "strayfile2"), "w"):
  982. pass
  983. ret = self.run_state("file.directory", name=name, clean=True)
  984. self.assertSaltTrueReturn(ret)
  985. self.assertFalse(os.path.exists(strayfile))
  986. self.assertFalse(os.path.exists(straydir))
  987. self.assertTrue(os.path.isdir(name))
  988. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  989. def test_directory_is_idempotent(self):
  990. """
  991. Ensure the file.directory state produces no changes when rerun.
  992. """
  993. name = os.path.join(RUNTIME_VARS.TMP, "a_dir_twice")
  994. if IS_WINDOWS:
  995. username = os.environ.get("USERNAME", "Administrators")
  996. domain = os.environ.get("USERDOMAIN", "")
  997. fullname = "{0}\\{1}".format(domain, username)
  998. ret = self.run_state("file.directory", name=name, win_owner=fullname)
  999. else:
  1000. ret = self.run_state("file.directory", name=name)
  1001. self.assertSaltTrueReturn(ret)
  1002. if IS_WINDOWS:
  1003. ret = self.run_state("file.directory", name=name, win_owner=username)
  1004. else:
  1005. ret = self.run_state("file.directory", name=name)
  1006. self.assertSaltTrueReturn(ret)
  1007. self.assertSaltStateChangesEqual(ret, {})
  1008. @with_tempdir()
  1009. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1010. def test_directory_clean_exclude(self, base_dir):
  1011. """
  1012. file.directory with clean=True and exclude_pat set
  1013. """
  1014. name = os.path.join(base_dir, "directory_clean_dir")
  1015. if not os.path.isdir(name):
  1016. os.makedirs(name)
  1017. strayfile = os.path.join(name, "strayfile")
  1018. with salt.utils.files.fopen(strayfile, "w"):
  1019. pass
  1020. straydir = os.path.join(name, "straydir")
  1021. if not os.path.isdir(straydir):
  1022. os.makedirs(straydir)
  1023. strayfile2 = os.path.join(straydir, "strayfile2")
  1024. with salt.utils.files.fopen(strayfile2, "w"):
  1025. pass
  1026. keepfile = os.path.join(straydir, "keepfile")
  1027. with salt.utils.files.fopen(keepfile, "w"):
  1028. pass
  1029. exclude_pat = "E@^straydir(|/keepfile)$"
  1030. if IS_WINDOWS:
  1031. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  1032. ret = self.run_state(
  1033. "file.directory", name=name, clean=True, exclude_pat=exclude_pat
  1034. )
  1035. self.assertSaltTrueReturn(ret)
  1036. self.assertFalse(os.path.exists(strayfile))
  1037. self.assertFalse(os.path.exists(strayfile2))
  1038. self.assertTrue(os.path.exists(keepfile))
  1039. @skipIf(IS_WINDOWS, "Skip on windows")
  1040. @with_tempdir()
  1041. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  1042. def test_test_directory_clean_exclude(self, base_dir):
  1043. """
  1044. file.directory with test=True, clean=True and exclude_pat set
  1045. Skipped on windows because clean and exclude_pat not supported by
  1046. salt.sates.file._check_directory_win
  1047. """
  1048. name = os.path.join(base_dir, "directory_clean_dir")
  1049. os.mkdir(name)
  1050. strayfile = os.path.join(name, "strayfile")
  1051. with salt.utils.files.fopen(strayfile, "w"):
  1052. pass
  1053. straydir = os.path.join(name, "straydir")
  1054. if not os.path.isdir(straydir):
  1055. os.makedirs(straydir)
  1056. strayfile2 = os.path.join(straydir, "strayfile2")
  1057. with salt.utils.files.fopen(strayfile2, "w"):
  1058. pass
  1059. keepfile = os.path.join(straydir, "keepfile")
  1060. with salt.utils.files.fopen(keepfile, "w"):
  1061. pass
  1062. exclude_pat = "E@^straydir(|/keepfile)$"
  1063. if IS_WINDOWS:
  1064. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  1065. ret = self.run_state(
  1066. "file.directory", test=True, name=name, clean=True, exclude_pat=exclude_pat
  1067. )
  1068. comment = next(six.itervalues(ret))["comment"]
  1069. self.assertSaltNoneReturn(ret)
  1070. self.assertTrue(os.path.exists(strayfile))
  1071. self.assertTrue(os.path.exists(strayfile2))
  1072. self.assertTrue(os.path.exists(keepfile))
  1073. self.assertIn(strayfile, comment)
  1074. self.assertIn(strayfile2, comment)
  1075. self.assertNotIn(keepfile, comment)
  1076. @with_tempdir()
  1077. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1078. def test_directory_clean_require_in(self, name):
  1079. """
  1080. file.directory test with clean=True and require_in file
  1081. """
  1082. state_name = "file-FileTest-test_directory_clean_require_in"
  1083. state_filename = state_name + ".sls"
  1084. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1085. wrong_file = os.path.join(name, "wrong")
  1086. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1087. fp.write("foo")
  1088. good_file = os.path.join(name, "bar")
  1089. with salt.utils.files.fopen(state_file, "w") as fp:
  1090. self.addCleanup(lambda: os.remove(state_file))
  1091. fp.write(
  1092. textwrap.dedent(
  1093. """\
  1094. some_dir:
  1095. file.directory:
  1096. - name: {name}
  1097. - clean: true
  1098. {good_file}:
  1099. file.managed:
  1100. - require_in:
  1101. - file: some_dir
  1102. """.format(
  1103. name=name, good_file=good_file
  1104. )
  1105. )
  1106. )
  1107. ret = self.run_function("state.sls", [state_name])
  1108. self.assertTrue(os.path.exists(good_file))
  1109. self.assertFalse(os.path.exists(wrong_file))
  1110. @with_tempdir()
  1111. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1112. def test_directory_clean_require_in_with_id(self, name):
  1113. """
  1114. file.directory test with clean=True and require_in file with an ID
  1115. different from the file name
  1116. """
  1117. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1118. state_filename = state_name + ".sls"
  1119. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1120. wrong_file = os.path.join(name, "wrong")
  1121. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1122. fp.write("foo")
  1123. good_file = os.path.join(name, "bar")
  1124. with salt.utils.files.fopen(state_file, "w") as fp:
  1125. self.addCleanup(lambda: os.remove(state_file))
  1126. fp.write(
  1127. textwrap.dedent(
  1128. """\
  1129. some_dir:
  1130. file.directory:
  1131. - name: {name}
  1132. - clean: true
  1133. some_file:
  1134. file.managed:
  1135. - name: {good_file}
  1136. - require_in:
  1137. - file: some_dir
  1138. """.format(
  1139. name=name, good_file=good_file
  1140. )
  1141. )
  1142. )
  1143. ret = self.run_function("state.sls", [state_name])
  1144. self.assertTrue(os.path.exists(good_file))
  1145. self.assertFalse(os.path.exists(wrong_file))
  1146. @skipIf(
  1147. salt.utils.platform.is_darwin(),
  1148. "WAR ROOM TEMPORARY SKIP, Test is flaky on macosx",
  1149. )
  1150. @with_tempdir()
  1151. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1152. def test_directory_clean_require_with_name(self, name):
  1153. """
  1154. file.directory test with clean=True and require with a file state
  1155. relatively to the state's name, not its ID.
  1156. """
  1157. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1158. state_filename = state_name + ".sls"
  1159. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1160. wrong_file = os.path.join(name, "wrong")
  1161. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1162. fp.write("foo")
  1163. good_file = os.path.join(name, "bar")
  1164. with salt.utils.files.fopen(state_file, "w") as fp:
  1165. self.addCleanup(lambda: os.remove(state_file))
  1166. fp.write(
  1167. textwrap.dedent(
  1168. """\
  1169. some_dir:
  1170. file.directory:
  1171. - name: {name}
  1172. - clean: true
  1173. - require:
  1174. # This requirement refers to the name of the following
  1175. # state, not its ID.
  1176. - file: {good_file}
  1177. some_file:
  1178. file.managed:
  1179. - name: {good_file}
  1180. """.format(
  1181. name=name, good_file=good_file
  1182. )
  1183. )
  1184. )
  1185. ret = self.run_function("state.sls", [state_name])
  1186. self.assertTrue(os.path.exists(good_file))
  1187. self.assertFalse(os.path.exists(wrong_file))
  1188. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1189. def test_directory_broken_symlink(self):
  1190. """
  1191. Ensure that file.directory works even if a directory
  1192. contains broken symbolic link
  1193. """
  1194. try:
  1195. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "foo")
  1196. null_file = "{0}/null".format(tmp_dir)
  1197. broken_link = "{0}/broken".format(tmp_dir)
  1198. os.mkdir(tmp_dir, 0o700)
  1199. self.run_function("file.symlink", [null_file, broken_link])
  1200. if IS_WINDOWS:
  1201. ret = self.run_state(
  1202. "file.directory",
  1203. name=tmp_dir,
  1204. recurse=["mode"],
  1205. follow_symlinks=True,
  1206. win_owner="Administrators",
  1207. )
  1208. else:
  1209. ret = self.run_state(
  1210. "file.directory",
  1211. name=tmp_dir,
  1212. recurse=["mode"],
  1213. file_mode=644,
  1214. dir_mode=755,
  1215. )
  1216. self.assertSaltTrueReturn(ret)
  1217. finally:
  1218. if os.path.isdir(tmp_dir):
  1219. self.run_function("file.remove", [tmp_dir])
  1220. @with_tempdir(create=False)
  1221. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1222. def test_recurse(self, name):
  1223. """
  1224. file.recurse
  1225. """
  1226. ret = self.run_state("file.recurse", name=name, source="salt://grail")
  1227. self.assertSaltTrueReturn(ret)
  1228. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1229. @with_tempdir(create=False)
  1230. @with_tempdir(create=False)
  1231. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1232. def test_recurse_specific_env(self, dir1, dir2):
  1233. """
  1234. file.recurse passing __env__
  1235. """
  1236. ret = self.run_state(
  1237. "file.recurse", name=dir1, source="salt://holy", __env__="prod"
  1238. )
  1239. self.assertSaltTrueReturn(ret)
  1240. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1241. ret = self.run_state(
  1242. "file.recurse", name=dir2, source="salt://holy", saltenv="prod"
  1243. )
  1244. self.assertSaltTrueReturn(ret)
  1245. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1246. @with_tempdir(create=False)
  1247. @with_tempdir(create=False)
  1248. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1249. def test_recurse_specific_env_in_url(self, dir1, dir2):
  1250. """
  1251. file.recurse passing __env__
  1252. """
  1253. ret = self.run_state(
  1254. "file.recurse", name=dir1, source="salt://holy?saltenv=prod"
  1255. )
  1256. self.assertSaltTrueReturn(ret)
  1257. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1258. ret = self.run_state(
  1259. "file.recurse", name=dir2, source="salt://holy?saltenv=prod"
  1260. )
  1261. self.assertSaltTrueReturn(ret)
  1262. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1263. @with_tempdir(create=False)
  1264. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1265. def test_test_recurse(self, name):
  1266. """
  1267. file.recurse test interface
  1268. """
  1269. ret = self.run_state(
  1270. "file.recurse", test=True, name=name, source="salt://grail",
  1271. )
  1272. self.assertSaltNoneReturn(ret)
  1273. self.assertFalse(os.path.isfile(os.path.join(name, "36", "scene")))
  1274. self.assertFalse(os.path.exists(name))
  1275. @with_tempdir(create=False)
  1276. @with_tempdir(create=False)
  1277. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1278. def test_test_recurse_specific_env(self, dir1, dir2):
  1279. """
  1280. file.recurse test interface
  1281. """
  1282. ret = self.run_state(
  1283. "file.recurse", test=True, name=dir1, source="salt://holy", __env__="prod"
  1284. )
  1285. self.assertSaltNoneReturn(ret)
  1286. self.assertFalse(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1287. self.assertFalse(os.path.exists(dir1))
  1288. ret = self.run_state(
  1289. "file.recurse", test=True, name=dir2, source="salt://holy", saltenv="prod"
  1290. )
  1291. self.assertSaltNoneReturn(ret)
  1292. self.assertFalse(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1293. self.assertFalse(os.path.exists(dir2))
  1294. @with_tempdir(create=False)
  1295. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1296. def test_recurse_template(self, name):
  1297. """
  1298. file.recurse with jinja template enabled
  1299. """
  1300. _ts = "TEMPLATE TEST STRING"
  1301. ret = self.run_state(
  1302. "file.recurse",
  1303. name=name,
  1304. source="salt://grail",
  1305. template="jinja",
  1306. defaults={"spam": _ts},
  1307. )
  1308. self.assertSaltTrueReturn(ret)
  1309. with salt.utils.files.fopen(os.path.join(name, "scene33"), "r") as fp_:
  1310. contents = fp_.read()
  1311. self.assertIn(_ts, contents)
  1312. @with_tempdir()
  1313. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1314. def test_recurse_clean(self, name):
  1315. """
  1316. file.recurse with clean=True
  1317. """
  1318. strayfile = os.path.join(name, "strayfile")
  1319. with salt.utils.files.fopen(strayfile, "w"):
  1320. pass
  1321. # Corner cases: replacing file with a directory and vice versa
  1322. with salt.utils.files.fopen(os.path.join(name, "36"), "w"):
  1323. pass
  1324. os.makedirs(os.path.join(name, "scene33"))
  1325. ret = self.run_state(
  1326. "file.recurse", name=name, source="salt://grail", clean=True
  1327. )
  1328. self.assertSaltTrueReturn(ret)
  1329. self.assertFalse(os.path.exists(strayfile))
  1330. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1331. self.assertTrue(os.path.isfile(os.path.join(name, "scene33")))
  1332. @with_tempdir()
  1333. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1334. def test_recurse_clean_specific_env(self, name):
  1335. """
  1336. file.recurse with clean=True and __env__=prod
  1337. """
  1338. strayfile = os.path.join(name, "strayfile")
  1339. with salt.utils.files.fopen(strayfile, "w"):
  1340. pass
  1341. # Corner cases: replacing file with a directory and vice versa
  1342. with salt.utils.files.fopen(os.path.join(name, "32"), "w"):
  1343. pass
  1344. os.makedirs(os.path.join(name, "scene34"))
  1345. ret = self.run_state(
  1346. "file.recurse", name=name, source="salt://holy", clean=True, __env__="prod"
  1347. )
  1348. self.assertSaltTrueReturn(ret)
  1349. self.assertFalse(os.path.exists(strayfile))
  1350. self.assertTrue(os.path.isfile(os.path.join(name, "32", "scene")))
  1351. self.assertTrue(os.path.isfile(os.path.join(name, "scene34")))
  1352. @skipIf(IS_WINDOWS, "Skip on windows")
  1353. @with_tempdir()
  1354. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  1355. def test_recurse_issue_34945(self, base_dir):
  1356. """
  1357. This tests the case where the source dir for the file.recurse state
  1358. does not contain any files (only subdirectories), and the dir_mode is
  1359. being managed. For a long time, this corner case resulted in the top
  1360. level of the destination directory being created with the wrong initial
  1361. permissions, a problem that would be corrected later on in the
  1362. file.recurse state via running state.directory. However, the
  1363. file.directory state only gets called when there are files to be
  1364. managed in that directory, and when the source directory contains only
  1365. subdirectories, the incorrectly-set initial perms would not be
  1366. repaired.
  1367. This was fixed in https://github.com/saltstack/salt/pull/35309
  1368. Skipped on windows because dir_mode is not supported.
  1369. """
  1370. dir_mode = "2775"
  1371. issue_dir = "issue-34945"
  1372. name = os.path.join(base_dir, issue_dir)
  1373. ret = self.run_state(
  1374. "file.recurse", name=name, source="salt://" + issue_dir, dir_mode=dir_mode
  1375. )
  1376. self.assertSaltTrueReturn(ret)
  1377. actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
  1378. self.assertEqual(dir_mode, actual_dir_mode)
  1379. @with_tempdir(create=False)
  1380. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1381. def test_recurse_issue_40578(self, name):
  1382. """
  1383. This ensures that the state doesn't raise an exception when it
  1384. encounters a file with a unicode filename in the process of invoking
  1385. file.source_list.
  1386. """
  1387. ret = self.run_state("file.recurse", name=name, source="salt://соль")
  1388. self.assertSaltTrueReturn(ret)
  1389. if six.PY2 and IS_WINDOWS:
  1390. # Providing unicode to os.listdir so that we avoid having listdir
  1391. # try to decode the filenames using the systemencoding on windows
  1392. # python 2.
  1393. files = os.listdir(name.decode("utf-8"))
  1394. else:
  1395. files = salt.utils.data.decode(os.listdir(name), normalize=True)
  1396. self.assertEqual(
  1397. sorted(files), sorted(["foo.txt", "спам.txt", "яйца.txt"]),
  1398. )
  1399. @with_tempfile()
  1400. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1401. def test_replace(self, name):
  1402. """
  1403. file.replace
  1404. """
  1405. with salt.utils.files.fopen(name, "w+") as fp_:
  1406. fp_.write("change_me")
  1407. ret = self.run_state(
  1408. "file.replace", name=name, pattern="change", repl="salt", backup=False
  1409. )
  1410. with salt.utils.files.fopen(name, "r") as fp_:
  1411. self.assertIn("salt", fp_.read())
  1412. self.assertSaltTrueReturn(ret)
  1413. @with_tempdir()
  1414. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1415. def test_replace_issue_18612(self, base_dir):
  1416. """
  1417. Test the (mis-)behaviour of file.replace as described in #18612:
  1418. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1419. an infinitely growing file as 'file.replace' didn't check beforehand
  1420. whether the changes had already been done to the file
  1421. # Case description:
  1422. The tested file contains one commented line
  1423. The commented line should be uncommented in the end, nothing else should change
  1424. """
  1425. test_name = "test_replace_issue_18612"
  1426. path_test = os.path.join(base_dir, test_name)
  1427. with salt.utils.files.fopen(path_test, "w+") as fp_test_:
  1428. fp_test_.write("# en_US.UTF-8")
  1429. ret = []
  1430. for x in range(0, 3):
  1431. ret.append(
  1432. self.run_state(
  1433. "file.replace",
  1434. name=path_test,
  1435. pattern="^# en_US.UTF-8$",
  1436. repl="en_US.UTF-8",
  1437. append_if_not_found=True,
  1438. )
  1439. )
  1440. # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
  1441. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1442. self.assertTrue((sum(1 for _ in fp_test_) == 1))
  1443. # ensure, the replacement succeeded
  1444. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1445. self.assertTrue(fp_test_.read().startswith("en_US.UTF-8"))
  1446. # ensure, all runs of 'file.replace' reported success
  1447. for item in ret:
  1448. self.assertSaltTrueReturn(item)
  1449. @with_tempdir()
  1450. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1451. def test_replace_issue_18612_prepend(self, base_dir):
  1452. """
  1453. Test the (mis-)behaviour of file.replace as described in #18612:
  1454. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1455. an infinitely growing file as 'file.replace' didn't check beforehand
  1456. whether the changes had already been done to the file
  1457. # Case description:
  1458. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1459. The replacement pattern should be prepended to the file
  1460. """
  1461. test_name = "test_replace_issue_18612_prepend"
  1462. path_in = os.path.join(
  1463. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1464. )
  1465. path_out = os.path.join(
  1466. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1467. )
  1468. path_test = os.path.join(base_dir, test_name)
  1469. # create test file based on initial template
  1470. shutil.copyfile(path_in, path_test)
  1471. ret = []
  1472. for x in range(0, 3):
  1473. ret.append(
  1474. self.run_state(
  1475. "file.replace",
  1476. name=path_test,
  1477. pattern="^# en_US.UTF-8$",
  1478. repl="en_US.UTF-8",
  1479. prepend_if_not_found=True,
  1480. )
  1481. )
  1482. # ensure, the resulting file contains the expected lines
  1483. self.assertTrue(filecmp.cmp(path_test, path_out))
  1484. # ensure the initial file was properly backed up
  1485. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1486. # ensure, all runs of 'file.replace' reported success
  1487. for item in ret:
  1488. self.assertSaltTrueReturn(item)
  1489. @with_tempdir()
  1490. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1491. def test_replace_issue_18612_append(self, base_dir):
  1492. """
  1493. Test the (mis-)behaviour of file.replace as described in #18612:
  1494. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1495. an infinitely growing file as 'file.replace' didn't check beforehand
  1496. whether the changes had already been done to the file
  1497. # Case description:
  1498. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1499. The replacement pattern should be appended to the file
  1500. """
  1501. test_name = "test_replace_issue_18612_append"
  1502. path_in = os.path.join(
  1503. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1504. )
  1505. path_out = os.path.join(
  1506. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1507. )
  1508. path_test = os.path.join(base_dir, test_name)
  1509. # create test file based on initial template
  1510. shutil.copyfile(path_in, path_test)
  1511. ret = []
  1512. for x in range(0, 3):
  1513. ret.append(
  1514. self.run_state(
  1515. "file.replace",
  1516. name=path_test,
  1517. pattern="^# en_US.UTF-8$",
  1518. repl="en_US.UTF-8",
  1519. append_if_not_found=True,
  1520. )
  1521. )
  1522. # ensure, the resulting file contains the expected lines
  1523. self.assertTrue(filecmp.cmp(path_test, path_out))
  1524. # ensure the initial file was properly backed up
  1525. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1526. # ensure, all runs of 'file.replace' reported success
  1527. for item in ret:
  1528. self.assertSaltTrueReturn(item)
  1529. @with_tempdir()
  1530. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1531. def test_replace_issue_18612_append_not_found_content(self, base_dir):
  1532. """
  1533. Test the (mis-)behaviour of file.replace as described in #18612:
  1534. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1535. an infinitely growing file as 'file.replace' didn't check beforehand
  1536. whether the changes had already been done to the file
  1537. # Case description:
  1538. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1539. The 'not_found_content' value should be appended to the file
  1540. """
  1541. test_name = "test_replace_issue_18612_append_not_found_content"
  1542. path_in = os.path.join(
  1543. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1544. )
  1545. path_out = os.path.join(
  1546. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1547. )
  1548. path_test = os.path.join(base_dir, test_name)
  1549. # create test file based on initial template
  1550. shutil.copyfile(path_in, path_test)
  1551. ret = []
  1552. for x in range(0, 3):
  1553. ret.append(
  1554. self.run_state(
  1555. "file.replace",
  1556. name=path_test,
  1557. pattern="^# en_US.UTF-8$",
  1558. repl="en_US.UTF-8",
  1559. append_if_not_found=True,
  1560. not_found_content="THIS LINE WASN'T FOUND! SO WE'RE APPENDING IT HERE!",
  1561. )
  1562. )
  1563. # ensure, the resulting file contains the expected lines
  1564. self.assertTrue(filecmp.cmp(path_test, path_out))
  1565. # ensure the initial file was properly backed up
  1566. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1567. # ensure, all runs of 'file.replace' reported success
  1568. for item in ret:
  1569. self.assertSaltTrueReturn(item)
  1570. @with_tempdir()
  1571. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1572. def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
  1573. """
  1574. Test the (mis-)behaviour of file.replace as described in #18612:
  1575. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1576. an infinitely growing file as 'file.replace' didn't check beforehand
  1577. whether the changes had already been done to the file
  1578. # Case description:
  1579. The tested file contains 5 key=value pairs
  1580. The commented key=value pair #foo=bar should be changed to foo=salt
  1581. The comment char (#) in front of foo=bar should be removed
  1582. """
  1583. test_name = "test_replace_issue_18612_change_mid_line_with_comment"
  1584. path_in = os.path.join(
  1585. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1586. )
  1587. path_out = os.path.join(
  1588. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1589. )
  1590. path_test = os.path.join(base_dir, test_name)
  1591. # create test file based on initial template
  1592. shutil.copyfile(path_in, path_test)
  1593. ret = []
  1594. for x in range(0, 3):
  1595. ret.append(
  1596. self.run_state(
  1597. "file.replace",
  1598. name=path_test,
  1599. pattern="^#foo=bar($|(?=\r\n))",
  1600. repl="foo=salt",
  1601. append_if_not_found=True,
  1602. )
  1603. )
  1604. # ensure, the resulting file contains the expected lines
  1605. self.assertTrue(filecmp.cmp(path_test, path_out))
  1606. # ensure the initial file was properly backed up
  1607. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1608. # ensure, all 'file.replace' runs reported success
  1609. for item in ret:
  1610. self.assertSaltTrueReturn(item)
  1611. @with_tempdir()
  1612. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1613. def test_replace_issue_18841_no_changes(self, base_dir):
  1614. """
  1615. Test the (mis-)behaviour of file.replace as described in #18841:
  1616. Using file.replace in a way which shouldn't modify the file at all
  1617. results in changed mtime of the original file and a backup file being created.
  1618. # Case description
  1619. The tested file contains multiple lines
  1620. The tested file contains a line already matching the replacement (no change needed)
  1621. The tested file's content shouldn't change at all
  1622. The tested file's mtime shouldn't change at all
  1623. No backup file should be created
  1624. """
  1625. test_name = "test_replace_issue_18841_no_changes"
  1626. path_in = os.path.join(
  1627. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1628. )
  1629. path_test = os.path.join(base_dir, test_name)
  1630. # create test file based on initial template
  1631. shutil.copyfile(path_in, path_test)
  1632. # get (m|a)time of file
  1633. fstats_orig = os.stat(path_test)
  1634. # define how far we predate the file
  1635. age = 5 * 24 * 60 * 60
  1636. # set (m|a)time of file 5 days into the past
  1637. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1638. ret = self.run_state(
  1639. "file.replace",
  1640. name=path_test,
  1641. pattern="^hello world$",
  1642. repl="goodbye world",
  1643. show_changes=True,
  1644. flags=["IGNORECASE"],
  1645. backup=False,
  1646. )
  1647. # get (m|a)time of file
  1648. fstats_post = os.stat(path_test)
  1649. # ensure, the file content didn't change
  1650. self.assertTrue(filecmp.cmp(path_in, path_test))
  1651. # ensure no backup file was created
  1652. self.assertFalse(os.path.exists(path_test + ".bak"))
  1653. # ensure the file's mtime didn't change
  1654. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1655. # ensure, all 'file.replace' runs reported success
  1656. self.assertSaltTrueReturn(ret)
  1657. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1658. def test_serialize(self):
  1659. """
  1660. Test to ensure that file.serialize returns a data structure that's
  1661. both serialized and formatted properly
  1662. """
  1663. path_test = os.path.join(RUNTIME_VARS.TMP, "test_serialize")
  1664. ret = self.run_state(
  1665. "file.serialize",
  1666. name=path_test,
  1667. dataset={
  1668. "name": "naive",
  1669. "description": "A basic test",
  1670. "a_list": ["first_element", "second_element"],
  1671. "finally": "the last item",
  1672. },
  1673. formatter="json",
  1674. )
  1675. with salt.utils.files.fopen(path_test, "rb") as fp_:
  1676. serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
  1677. # The JSON serializer uses LF even on OSes where os.sep is CRLF.
  1678. expected_file = "\n".join(
  1679. [
  1680. "{",
  1681. ' "a_list": [',
  1682. ' "first_element",',
  1683. ' "second_element"',
  1684. " ],",
  1685. ' "description": "A basic test",',
  1686. ' "finally": "the last item",',
  1687. ' "name": "naive"',
  1688. "}",
  1689. "",
  1690. ]
  1691. )
  1692. self.assertEqual(serialized_file, expected_file)
  1693. @with_tempfile(create=False)
  1694. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1695. def test_serializer_deserializer_opts(self, name):
  1696. """
  1697. Test the serializer_opts and deserializer_opts options
  1698. """
  1699. data1 = {"foo": {"bar": "%(x)s"}}
  1700. data2 = {"foo": {"abc": 123}}
  1701. merged = {"foo": {"y": "not_used", "x": "baz", "abc": 123, "bar": "baz"}}
  1702. ret = self.run_state(
  1703. "file.serialize",
  1704. name=name,
  1705. dataset=data1,
  1706. formatter="configparser",
  1707. deserializer_opts=[{"defaults": {"y": "not_used"}}],
  1708. )
  1709. ret = ret[next(iter(ret))]
  1710. assert ret["result"], ret
  1711. # We should have warned about deserializer_opts being used when
  1712. # merge_if_exists was not set to True.
  1713. assert "warnings" in ret
  1714. # Run with merge_if_exists, as well as serializer and deserializer opts
  1715. # deserializer opts will be used for string interpolation of the %(x)s
  1716. # that was written to the file with data1 (i.e. bar should become baz)
  1717. ret = self.run_state(
  1718. "file.serialize",
  1719. name=name,
  1720. dataset=data2,
  1721. formatter="configparser",
  1722. merge_if_exists=True,
  1723. serializer_opts=[{"defaults": {"y": "not_used"}}],
  1724. deserializer_opts=[{"defaults": {"x": "baz"}}],
  1725. )
  1726. ret = ret[next(iter(ret))]
  1727. assert ret["result"], ret
  1728. with salt.utils.files.fopen(name) as fp_:
  1729. serialized_data = salt.serializers.configparser.deserialize(fp_)
  1730. # If this test fails, this debug logging will help tell us how the
  1731. # serialized data differs from what was serialized.
  1732. log.debug("serialized_data = %r", serialized_data)
  1733. log.debug("merged = %r", merged)
  1734. # serializing with a default of 'y' will add y = not_used into foo
  1735. assert serialized_data["foo"]["y"] == merged["foo"]["y"]
  1736. # deserializing with default of x = baz will perform interpolation on %(x)s
  1737. # and bar will then = baz
  1738. assert serialized_data["foo"]["bar"] == merged["foo"]["bar"]
  1739. @with_tempdir()
  1740. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1741. def test_replace_issue_18841_omit_backup(self, base_dir):
  1742. """
  1743. Test the (mis-)behaviour of file.replace as described in #18841:
  1744. Using file.replace in a way which shouldn't modify the file at all
  1745. results in changed mtime of the original file and a backup file being created.
  1746. # Case description
  1747. The tested file contains multiple lines
  1748. The tested file contains a line already matching the replacement (no change needed)
  1749. The tested file's content shouldn't change at all
  1750. The tested file's mtime shouldn't change at all
  1751. No backup file should be created, although backup=False isn't explicitly defined
  1752. """
  1753. test_name = "test_replace_issue_18841_omit_backup"
  1754. path_in = os.path.join(
  1755. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1756. )
  1757. path_test = os.path.join(base_dir, test_name)
  1758. # create test file based on initial template
  1759. shutil.copyfile(path_in, path_test)
  1760. # get (m|a)time of file
  1761. fstats_orig = os.stat(path_test)
  1762. # define how far we predate the file
  1763. age = 5 * 24 * 60 * 60
  1764. # set (m|a)time of file 5 days into the past
  1765. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1766. ret = self.run_state(
  1767. "file.replace",
  1768. name=path_test,
  1769. pattern="^hello world$",
  1770. repl="goodbye world",
  1771. show_changes=True,
  1772. flags=["IGNORECASE"],
  1773. )
  1774. # get (m|a)time of file
  1775. fstats_post = os.stat(path_test)
  1776. # ensure, the file content didn't change
  1777. self.assertTrue(filecmp.cmp(path_in, path_test))
  1778. # ensure no backup file was created
  1779. self.assertFalse(os.path.exists(path_test + ".bak"))
  1780. # ensure the file's mtime didn't change
  1781. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1782. # ensure, all 'file.replace' runs reported success
  1783. self.assertSaltTrueReturn(ret)
  1784. @with_tempfile()
  1785. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1786. def test_comment(self, name):
  1787. """
  1788. file.comment
  1789. """
  1790. # write a line to file
  1791. with salt.utils.files.fopen(name, "w+") as fp_:
  1792. fp_.write("comment_me")
  1793. # Look for changes with test=True: return should be "None" at the first run
  1794. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1795. self.assertSaltNoneReturn(ret)
  1796. # comment once
  1797. ret = self.run_state("file.comment", name=name, regex="^comment")
  1798. # result is positive
  1799. self.assertSaltTrueReturn(ret)
  1800. # line is commented
  1801. with salt.utils.files.fopen(name, "r") as fp_:
  1802. self.assertTrue(fp_.read().startswith("#comment"))
  1803. # comment twice
  1804. ret = self.run_state("file.comment", name=name, regex="^comment")
  1805. # result is still positive
  1806. self.assertSaltTrueReturn(ret)
  1807. # line is still commented
  1808. with salt.utils.files.fopen(name, "r") as fp_:
  1809. self.assertTrue(fp_.read().startswith("#comment"))
  1810. # Test previously commented file returns "True" now and not "None" with test=True
  1811. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1812. self.assertSaltTrueReturn(ret)
  1813. @with_tempfile()
  1814. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1815. def test_test_comment(self, name):
  1816. """
  1817. file.comment test interface
  1818. """
  1819. with salt.utils.files.fopen(name, "w+") as fp_:
  1820. fp_.write("comment_me")
  1821. ret = self.run_state("file.comment", test=True, name=name, regex=".*comment.*",)
  1822. with salt.utils.files.fopen(name, "r") as fp_:
  1823. self.assertNotIn("#comment", fp_.read())
  1824. self.assertSaltNoneReturn(ret)
  1825. @with_tempfile()
  1826. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1827. def test_uncomment(self, name):
  1828. """
  1829. file.uncomment
  1830. """
  1831. with salt.utils.files.fopen(name, "w+") as fp_:
  1832. fp_.write("#comment_me")
  1833. ret = self.run_state("file.uncomment", name=name, regex="^comment")
  1834. with salt.utils.files.fopen(name, "r") as fp_:
  1835. self.assertNotIn("#comment", fp_.read())
  1836. self.assertSaltTrueReturn(ret)
  1837. @with_tempfile()
  1838. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1839. def test_test_uncomment(self, name):
  1840. """
  1841. file.comment test interface
  1842. """
  1843. with salt.utils.files.fopen(name, "w+") as fp_:
  1844. fp_.write("#comment_me")
  1845. ret = self.run_state("file.uncomment", test=True, name=name, regex="^comment.*")
  1846. with salt.utils.files.fopen(name, "r") as fp_:
  1847. self.assertIn("#comment", fp_.read())
  1848. self.assertSaltNoneReturn(ret)
  1849. @with_tempfile()
  1850. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1851. def test_append(self, name):
  1852. """
  1853. file.append
  1854. """
  1855. with salt.utils.files.fopen(name, "w+") as fp_:
  1856. fp_.write("#salty!")
  1857. ret = self.run_state("file.append", name=name, text="cheese")
  1858. with salt.utils.files.fopen(name, "r") as fp_:
  1859. self.assertIn("cheese", fp_.read())
  1860. self.assertSaltTrueReturn(ret)
  1861. @with_tempfile()
  1862. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1863. def test_test_append(self, name):
  1864. """
  1865. file.append test interface
  1866. """
  1867. with salt.utils.files.fopen(name, "w+") as fp_:
  1868. fp_.write("#salty!")
  1869. ret = self.run_state("file.append", test=True, name=name, text="cheese")
  1870. with salt.utils.files.fopen(name, "r") as fp_:
  1871. self.assertNotIn("cheese", fp_.read())
  1872. self.assertSaltNoneReturn(ret)
  1873. @with_tempdir()
  1874. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1875. def test_append_issue_1864_makedirs(self, base_dir):
  1876. """
  1877. file.append but create directories if needed as an option, and create
  1878. the file if it doesn't exist
  1879. """
  1880. fname = "append_issue_1864_makedirs"
  1881. name = os.path.join(base_dir, fname)
  1882. # Non existing file get's touched
  1883. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1884. self.assertSaltTrueReturn(ret)
  1885. # Nested directory and file get's touched
  1886. name = os.path.join(base_dir, "issue_1864", fname)
  1887. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1888. self.assertSaltTrueReturn(ret)
  1889. # Parent directory exists but file does not and makedirs is False
  1890. name = os.path.join(base_dir, "issue_1864", fname + "2")
  1891. ret = self.run_state("file.append", name=name, text="cheese")
  1892. self.assertSaltTrueReturn(ret)
  1893. self.assertTrue(os.path.isfile(name))
  1894. @with_tempdir()
  1895. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1896. def test_prepend_issue_27401_makedirs(self, base_dir):
  1897. """
  1898. file.prepend but create directories if needed as an option, and create
  1899. the file if it doesn't exist
  1900. """
  1901. fname = "prepend_issue_27401"
  1902. name = os.path.join(base_dir, fname)
  1903. # Non existing file get's touched
  1904. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1905. self.assertSaltTrueReturn(ret)
  1906. # Nested directory and file get's touched
  1907. name = os.path.join(base_dir, "issue_27401", fname)
  1908. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1909. self.assertSaltTrueReturn(ret)
  1910. # Parent directory exists but file does not and makedirs is False
  1911. name = os.path.join(base_dir, "issue_27401", fname + "2")
  1912. ret = self.run_state("file.prepend", name=name, text="cheese")
  1913. self.assertSaltTrueReturn(ret)
  1914. self.assertTrue(os.path.isfile(name))
  1915. @with_tempfile()
  1916. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1917. def test_touch(self, name):
  1918. """
  1919. file.touch
  1920. """
  1921. ret = self.run_state("file.touch", name=name)
  1922. self.assertTrue(os.path.isfile(name))
  1923. self.assertSaltTrueReturn(ret)
  1924. @with_tempfile(create=False)
  1925. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1926. def test_test_touch(self, name):
  1927. """
  1928. file.touch test interface
  1929. """
  1930. ret = self.run_state("file.touch", test=True, name=name)
  1931. self.assertFalse(os.path.isfile(name))
  1932. self.assertSaltNoneReturn(ret)
  1933. @with_tempdir()
  1934. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  1935. def test_touch_directory(self, base_dir):
  1936. """
  1937. file.touch a directory
  1938. """
  1939. name = os.path.join(base_dir, "touch_test_dir")
  1940. os.mkdir(name)
  1941. ret = self.run_state("file.touch", name=name)
  1942. self.assertSaltTrueReturn(ret)
  1943. self.assertTrue(os.path.isdir(name))
  1944. @with_tempdir()
  1945. @pytest.mark.slow_test(seconds=120) # Test takes >60 and <=120 seconds
  1946. def test_issue_2227_file_append(self, base_dir):
  1947. """
  1948. Text to append includes a percent symbol
  1949. """
  1950. # let's make use of existing state to create a file with contents to
  1951. # test against
  1952. tmp_file_append = os.path.join(base_dir, "test.append")
  1953. self.run_state("file.touch", name=tmp_file_append)
  1954. self.run_state(
  1955. "file.append", name=tmp_file_append, source="salt://testappend/firstif"
  1956. )
  1957. self.run_state(
  1958. "file.append", name=tmp_file_append, source="salt://testappend/secondif"
  1959. )
  1960. # Now our real test
  1961. try:
  1962. ret = self.run_state(
  1963. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  1964. )
  1965. self.assertSaltTrueReturn(ret)
  1966. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  1967. contents_pre = fp_.read()
  1968. # It should not append text again
  1969. ret = self.run_state(
  1970. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  1971. )
  1972. self.assertSaltTrueReturn(ret)
  1973. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  1974. contents_post = fp_.read()
  1975. self.assertEqual(contents_pre, contents_post)
  1976. except AssertionError:
  1977. if os.path.exists(tmp_file_append):
  1978. shutil.copy(tmp_file_append, tmp_file_append + ".bak")
  1979. raise
  1980. @with_tempdir()
  1981. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  1982. def test_issue_2401_file_comment(self, base_dir):
  1983. # Get a path to the temporary file
  1984. tmp_file = os.path.join(base_dir, "issue-2041-comment.txt")
  1985. # Write some data to it
  1986. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  1987. fp_.write("hello\nworld\n")
  1988. # create the sls template
  1989. template_lines = [
  1990. "{0}:".format(tmp_file),
  1991. " file.comment:",
  1992. " - regex: ^world",
  1993. ]
  1994. template = "\n".join(template_lines)
  1995. try:
  1996. ret = self.run_function("state.template_str", [template], timeout=120)
  1997. self.assertSaltTrueReturn(ret)
  1998. self.assertNotInSaltComment("Pattern already commented", ret)
  1999. self.assertInSaltComment("Commented lines successfully", ret)
  2000. # This next time, it is already commented.
  2001. ret = self.run_function("state.template_str", [template], timeout=120)
  2002. self.assertSaltTrueReturn(ret)
  2003. self.assertInSaltComment("Pattern already commented", ret)
  2004. except AssertionError:
  2005. shutil.copy(tmp_file, tmp_file + ".bak")
  2006. raise
  2007. @with_tempdir()
  2008. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2009. def test_issue_2379_file_append(self, base_dir):
  2010. # Get a path to the temporary file
  2011. tmp_file = os.path.join(base_dir, "issue-2379-file-append.txt")
  2012. # Write some data to it
  2013. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  2014. fp_.write(
  2015. "hello\nworld\n" # Some junk
  2016. "#PermitRootLogin yes\n" # Commented text
  2017. "# PermitRootLogin yes\n" # Commented text with space
  2018. )
  2019. # create the sls template
  2020. template_lines = [
  2021. "{0}:".format(tmp_file),
  2022. " file.append:",
  2023. " - text: PermitRootLogin yes",
  2024. ]
  2025. template = "\n".join(template_lines)
  2026. try:
  2027. ret = self.run_function("state.template_str", [template])
  2028. self.assertSaltTrueReturn(ret)
  2029. self.assertInSaltComment("Appended 1 lines", ret)
  2030. except AssertionError:
  2031. shutil.copy(tmp_file, tmp_file + ".bak")
  2032. raise
  2033. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  2034. @with_tempdir(create=False)
  2035. @with_tempdir(create=False)
  2036. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  2037. def test_issue_2726_mode_kwarg(self, dir1, dir2):
  2038. # Let's test for the wrong usage approach
  2039. bad_mode_kwarg_testfile = os.path.join(dir1, "bad_mode_kwarg", "testfile")
  2040. bad_template = [
  2041. "{0}:".format(bad_mode_kwarg_testfile),
  2042. " file.recurse:",
  2043. " - source: salt://testfile",
  2044. " - mode: 644",
  2045. ]
  2046. ret = self.run_function("state.template_str", [os.linesep.join(bad_template)])
  2047. self.assertSaltFalseReturn(ret)
  2048. self.assertInSaltComment(
  2049. "'mode' is not allowed in 'file.recurse'. Please use "
  2050. "'file_mode' and 'dir_mode'.",
  2051. ret,
  2052. )
  2053. self.assertNotInSaltComment(
  2054. "TypeError: managed() got multiple values for keyword " "argument 'mode'",
  2055. ret,
  2056. )
  2057. # Now, the correct usage approach
  2058. good_mode_kwargs_testfile = os.path.join(dir2, "good_mode_kwargs", "testappend")
  2059. good_template = [
  2060. "{0}:".format(good_mode_kwargs_testfile),
  2061. " file.recurse:",
  2062. " - source: salt://testappend",
  2063. " - dir_mode: 744",
  2064. " - file_mode: 644",
  2065. ]
  2066. ret = self.run_function("state.template_str", [os.linesep.join(good_template)])
  2067. self.assertSaltTrueReturn(ret)
  2068. @with_tempdir()
  2069. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2070. def test_issue_8343_accumulated_require_in(self, base_dir):
  2071. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8343.sls")
  2072. testcase_filedest = os.path.join(base_dir, "issue-8343.txt")
  2073. if os.path.exists(template_path):
  2074. os.remove(template_path)
  2075. if os.path.exists(testcase_filedest):
  2076. os.remove(testcase_filedest)
  2077. sls_template = [
  2078. "{0}:",
  2079. " file.managed:",
  2080. " - contents: |",
  2081. " #",
  2082. "",
  2083. "prepend-foo-accumulator-from-pillar:",
  2084. " file.accumulated:",
  2085. " - require_in:",
  2086. " - file: prepend-foo-management",
  2087. " - filename: {0}",
  2088. " - text: |",
  2089. " foo",
  2090. "",
  2091. "append-foo-accumulator-from-pillar:",
  2092. " file.accumulated:",
  2093. " - require_in:",
  2094. " - file: append-foo-management",
  2095. " - filename: {0}",
  2096. " - text: |",
  2097. " bar",
  2098. "",
  2099. "prepend-foo-management:",
  2100. " file.blockreplace:",
  2101. " - name: {0}",
  2102. ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
  2103. ' - marker_end: "#-- end salt managed zonestart --"',
  2104. " - content: ''",
  2105. " - prepend_if_not_found: True",
  2106. " - backup: '.bak'",
  2107. " - show_changes: True",
  2108. "",
  2109. "append-foo-management:",
  2110. " file.blockreplace:",
  2111. " - name: {0}",
  2112. ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
  2113. ' - marker_end: "#-- end salt managed zoneend --"',
  2114. " - content: ''",
  2115. " - append_if_not_found: True",
  2116. " - backup: '.bak2'",
  2117. " - show_changes: True",
  2118. "",
  2119. ]
  2120. with salt.utils.files.fopen(template_path, "w") as fp_:
  2121. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2122. ret = self.run_function("state.sls", mods="issue-8343")
  2123. for name, step in six.iteritems(ret):
  2124. self.assertSaltTrueReturn({name: step})
  2125. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2126. contents = fp_.read().split(os.linesep)
  2127. expected = [
  2128. "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT",
  2129. "foo",
  2130. "#-- end salt managed zonestart --",
  2131. "#",
  2132. "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT",
  2133. "bar",
  2134. "#-- end salt managed zoneend --",
  2135. "",
  2136. ]
  2137. self.assertEqual(
  2138. [salt.utils.stringutils.to_str(line) for line in expected], contents
  2139. )
  2140. @with_tempdir()
  2141. @skipIf(
  2142. salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
  2143. )
  2144. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2145. def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
  2146. # causes the Import-Module ServerManager error on Windows
  2147. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-11003.sls")
  2148. testcase_filedest = os.path.join(base_dir, "issue-11003.txt")
  2149. sls_template = [
  2150. "a{0}:",
  2151. " file.absent:",
  2152. " - name: {0}",
  2153. "",
  2154. "{0}:",
  2155. " file.managed:",
  2156. " - contents: |",
  2157. " #",
  2158. "",
  2159. "test-acc1:",
  2160. " file.accumulated:",
  2161. " - require_in:",
  2162. " - file: final",
  2163. " - filename: {0}",
  2164. " - text: |",
  2165. " bar",
  2166. "",
  2167. "test-acc2:",
  2168. " file.accumulated:",
  2169. " - watch_in:",
  2170. " - file: final",
  2171. " - filename: {0}",
  2172. " - text: |",
  2173. " baz",
  2174. "",
  2175. "final:",
  2176. " file.blockreplace:",
  2177. " - name: {0}",
  2178. ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
  2179. ' - marker_end: "#-- end managed zone"',
  2180. " - content: ''",
  2181. " - append_if_not_found: True",
  2182. " - show_changes: True",
  2183. ]
  2184. with salt.utils.files.fopen(template_path, "w") as fp_:
  2185. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2186. ret = self.run_function("state.sls", mods="issue-11003", timeout=600)
  2187. for name, step in six.iteritems(ret):
  2188. self.assertSaltTrueReturn({name: step})
  2189. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2190. contents = fp_.read().split(os.linesep)
  2191. begin = contents.index("#-- start managed zone PLEASE, DO NOT EDIT") + 1
  2192. end = contents.index("#-- end managed zone")
  2193. block_contents = contents[begin:end]
  2194. for item in ("", "bar", "baz"):
  2195. block_contents.remove(item)
  2196. self.assertEqual(block_contents, [])
  2197. @with_tempdir()
  2198. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2199. def test_issue_8947_utf8_sls(self, base_dir):
  2200. """
  2201. Test some file operation with utf-8 characters on the sls
  2202. This is more generic than just a file test. Feel free to move
  2203. """
  2204. self.maxDiff = None
  2205. korean_1 = "한국어 시험"
  2206. korean_2 = "첫 번째 행"
  2207. korean_3 = "마지막 행"
  2208. test_file = os.path.join(base_dir, "{0}.txt".format(korean_1))
  2209. test_file_encoded = test_file
  2210. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8947.sls")
  2211. # create the sls template
  2212. template = textwrap.dedent(
  2213. """\
  2214. some-utf8-file-create:
  2215. file.managed:
  2216. - name: {test_file}
  2217. - contents: {korean_1}
  2218. - makedirs: True
  2219. - replace: True
  2220. - show_diff: True
  2221. some-utf8-file-create2:
  2222. file.managed:
  2223. - name: {test_file}
  2224. - contents: |
  2225. {korean_2}
  2226. {korean_1}
  2227. {korean_3}
  2228. - replace: True
  2229. - show_diff: True
  2230. """.format(
  2231. **locals()
  2232. )
  2233. )
  2234. if not salt.utils.platform.is_windows():
  2235. template += textwrap.dedent(
  2236. """\
  2237. some-utf8-file-content-test:
  2238. cmd.run:
  2239. - name: 'cat "{test_file}"'
  2240. - require:
  2241. - file: some-utf8-file-create2
  2242. """.format(
  2243. **locals()
  2244. )
  2245. )
  2246. # Save template file
  2247. with salt.utils.files.fopen(template_path, "wb") as fp_:
  2248. fp_.write(salt.utils.stringutils.to_bytes(template))
  2249. try:
  2250. result = self.run_function("state.sls", mods="issue-8947")
  2251. if not isinstance(result, dict):
  2252. raise AssertionError(
  2253. (
  2254. "Something went really wrong while testing this sls:" " {0}"
  2255. ).format(repr(result))
  2256. )
  2257. # difflib produces different output on python 2.6 than on >=2.7
  2258. if sys.version_info < (2, 7):
  2259. diff = "--- \n+++ \n@@ -1,1 +1,3 @@\n"
  2260. else:
  2261. diff = "--- \n+++ \n@@ -1 +1,3 @@\n"
  2262. diff += ("+첫 번째 행{0}" " 한국어 시험{0}" "+마지막 행{0}").format(os.linesep)
  2263. ret = {x.split("_|-")[1]: y for x, y in six.iteritems(result)}
  2264. # Confirm initial creation of file
  2265. self.assertEqual(
  2266. ret["some-utf8-file-create"]["comment"],
  2267. "File {0} updated".format(test_file_encoded),
  2268. )
  2269. self.assertEqual(
  2270. ret["some-utf8-file-create"]["changes"], {"diff": "New file"}
  2271. )
  2272. # Confirm file was modified and that the diff was as expected
  2273. self.assertEqual(
  2274. ret["some-utf8-file-create2"]["comment"],
  2275. "File {0} updated".format(test_file_encoded),
  2276. )
  2277. self.assertEqual(ret["some-utf8-file-create2"]["changes"], {"diff": diff})
  2278. if salt.utils.platform.is_windows():
  2279. import subprocess
  2280. import win32api
  2281. p = subprocess.Popen(
  2282. salt.utils.stringutils.to_str(
  2283. "type {}".format(win32api.GetShortPathName(test_file))
  2284. ),
  2285. shell=True,
  2286. stdout=subprocess.PIPE,
  2287. stderr=subprocess.PIPE,
  2288. )
  2289. p.poll()
  2290. out = p.stdout.read()
  2291. self.assertEqual(
  2292. out.decode("utf-8"),
  2293. os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep,
  2294. )
  2295. else:
  2296. self.assertEqual(
  2297. ret["some-utf8-file-content-test"]["comment"],
  2298. 'Command "cat "{0}"" run'.format(test_file_encoded),
  2299. )
  2300. self.assertEqual(
  2301. ret["some-utf8-file-content-test"]["changes"]["stdout"],
  2302. "\n".join((korean_2, korean_1, korean_3)),
  2303. )
  2304. finally:
  2305. try:
  2306. os.remove(template_path)
  2307. except OSError:
  2308. pass
  2309. @pytest.mark.skip_if_not_root
  2310. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2311. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2312. @with_system_user_and_group(
  2313. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2314. )
  2315. @with_tempdir()
  2316. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  2317. def test_issue_12209_follow_symlinks(self, tempdir, user, group):
  2318. """
  2319. Ensure that symlinks are properly chowned when recursing (following
  2320. symlinks)
  2321. """
  2322. # Make the directories for this test
  2323. onedir = os.path.join(tempdir, "one")
  2324. twodir = os.path.join(tempdir, "two")
  2325. os.mkdir(onedir)
  2326. os.symlink(onedir, twodir)
  2327. # Run the state
  2328. ret = self.run_state(
  2329. "file.directory",
  2330. name=tempdir,
  2331. follow_symlinks=True,
  2332. user=user,
  2333. group=group,
  2334. recurse=["user", "group"],
  2335. )
  2336. self.assertSaltTrueReturn(ret)
  2337. # Double-check, in case state mis-reported a True result. Since we are
  2338. # following symlinks, we expect twodir to still be owned by root, but
  2339. # onedir should be owned by the 'issue12209' user.
  2340. onestats = os.stat(onedir)
  2341. twostats = os.lstat(twodir)
  2342. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2343. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, "root")
  2344. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2345. if salt.utils.path.which("id"):
  2346. root_group = self.run_function("user.primary_group", ["root"])
  2347. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
  2348. @pytest.mark.skip_if_not_root
  2349. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2350. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2351. @with_system_user_and_group(
  2352. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2353. )
  2354. @with_tempdir()
  2355. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  2356. def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
  2357. """
  2358. Ensure that symlinks are properly chowned when recursing (not following
  2359. symlinks)
  2360. """
  2361. # Make the directories for this test
  2362. onedir = os.path.join(tempdir, "one")
  2363. twodir = os.path.join(tempdir, "two")
  2364. os.mkdir(onedir)
  2365. os.symlink(onedir, twodir)
  2366. # Run the state
  2367. ret = self.run_state(
  2368. "file.directory",
  2369. name=tempdir,
  2370. follow_symlinks=False,
  2371. user=user,
  2372. group=group,
  2373. recurse=["user", "group"],
  2374. )
  2375. self.assertSaltTrueReturn(ret)
  2376. # Double-check, in case state mis-reported a True result. Since we
  2377. # are not following symlinks, we expect twodir to now be owned by
  2378. # the 'issue12209' user, just link onedir.
  2379. onestats = os.stat(onedir)
  2380. twostats = os.lstat(twodir)
  2381. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2382. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
  2383. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2384. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
  2385. @with_tempfile(create=False)
  2386. @with_tempfile()
  2387. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2388. def test_template_local_file(self, source, dest):
  2389. """
  2390. Test a file.managed state with a local file as the source. Test both
  2391. with the file:// protocol designation prepended, and without it.
  2392. """
  2393. with salt.utils.files.fopen(source, "w") as fp_:
  2394. fp_.write("{{ foo }}\n")
  2395. for prefix in ("file://", ""):
  2396. ret = self.run_state(
  2397. "file.managed",
  2398. name=dest,
  2399. source=prefix + source,
  2400. template="jinja",
  2401. context={"foo": "Hello world!"},
  2402. )
  2403. self.assertSaltTrueReturn(ret)
  2404. @with_tempfile()
  2405. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2406. def test_template_local_file_noclobber(self, source):
  2407. """
  2408. Test the case where a source file is in the minion's local filesystem,
  2409. and the source path is the same as the destination path.
  2410. """
  2411. with salt.utils.files.fopen(source, "w") as fp_:
  2412. fp_.write("{{ foo }}\n")
  2413. ret = self.run_state(
  2414. "file.managed",
  2415. name=source,
  2416. source=source,
  2417. template="jinja",
  2418. context={"foo": "Hello world!"},
  2419. )
  2420. self.assertSaltFalseReturn(ret)
  2421. self.assertIn(
  2422. ("Source file cannot be the same as destination"),
  2423. ret[next(iter(ret))]["comment"],
  2424. )
  2425. @with_tempfile(create=False)
  2426. @with_tempfile(create=False)
  2427. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2428. def test_issue_25250_force_copy_deletes(self, source, dest):
  2429. """
  2430. ensure force option in copy state does not delete target file
  2431. """
  2432. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2433. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "file/base/cheese"), dest)
  2434. self.run_state("file.copy", name=dest, source=source, force=True)
  2435. self.assertTrue(os.path.exists(dest))
  2436. self.assertTrue(filecmp.cmp(source, dest))
  2437. os.remove(source)
  2438. os.remove(dest)
  2439. @pytest.mark.skip_if_not_root
  2440. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  2441. @pytest.mark.destructive_test
  2442. @with_tempfile()
  2443. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  2444. def test_file_copy_make_dirs(self, source):
  2445. """
  2446. ensure make_dirs creates correct user perms
  2447. """
  2448. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2449. dest = os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2", "copied_file.txt")
  2450. user = "salt"
  2451. mode = "0644"
  2452. ret = self.run_function("user.add", [user])
  2453. self.assertTrue(ret, "Failed to add user. Are you running as sudo?")
  2454. ret = self.run_state(
  2455. "file.copy", name=dest, source=source, user=user, makedirs=True, mode=mode
  2456. )
  2457. self.assertSaltTrueReturn(ret)
  2458. file_checks = [
  2459. dest,
  2460. os.path.join(RUNTIME_VARS.TMP, "dir1"),
  2461. os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2"),
  2462. ]
  2463. for check in file_checks:
  2464. user_check = self.run_function("file.get_user", [check])
  2465. mode_check = self.run_function("file.get_mode", [check])
  2466. self.assertEqual(user_check, user)
  2467. self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
  2468. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2469. def test_contents_pillar_with_pillar_list(self):
  2470. """
  2471. This tests for any regressions for this issue:
  2472. https://github.com/saltstack/salt/issues/30934
  2473. """
  2474. state_file = "file_contents_pillar"
  2475. ret = self.run_function("state.sls", mods=state_file)
  2476. self.assertSaltTrueReturn(ret)
  2477. @pytest.mark.skip_if_not_root
  2478. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2479. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2480. @with_system_user_and_group(
  2481. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2482. )
  2483. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  2484. def test_owner_after_setuid(self, user, group):
  2485. """
  2486. Test to check file user/group after setting setuid or setgid.
  2487. Because Python os.chown() does reset the setuid/setgid to 0.
  2488. https://github.com/saltstack/salt/pull/45257
  2489. """
  2490. # Desired configuration.
  2491. desired = {
  2492. "file": os.path.join(RUNTIME_VARS.TMP, "file_with_setuid"),
  2493. "user": user,
  2494. "group": group,
  2495. "mode": "4750",
  2496. }
  2497. # Run the state.
  2498. ret = self.run_state(
  2499. "file.managed",
  2500. name=desired["file"],
  2501. user=desired["user"],
  2502. group=desired["group"],
  2503. mode=desired["mode"],
  2504. )
  2505. # Check result.
  2506. file_stat = os.stat(desired["file"])
  2507. result = {
  2508. "user": pwd.getpwuid(file_stat.st_uid).pw_name,
  2509. "group": grp.getgrgid(file_stat.st_gid).gr_name,
  2510. "mode": oct(stat.S_IMODE(file_stat.st_mode)),
  2511. }
  2512. self.assertSaltTrueReturn(ret)
  2513. self.assertEqual(desired["user"], result["user"])
  2514. self.assertEqual(desired["group"], result["group"])
  2515. self.assertEqual(desired["mode"], result["mode"].lstrip("0Oo"))
  2516. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2517. def test_binary_contents(self):
  2518. """
  2519. This tests to ensure that binary contents do not cause a traceback.
  2520. """
  2521. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2522. try:
  2523. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2524. self.assertSaltTrueReturn(ret)
  2525. finally:
  2526. try:
  2527. os.remove(name)
  2528. except OSError:
  2529. pass
  2530. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2531. def test_binary_contents_twice(self):
  2532. """
  2533. This test ensures that after a binary file is created, salt can confirm
  2534. that the file is in the correct state.
  2535. """
  2536. # Create a binary file
  2537. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2538. # First run state ensures file is created
  2539. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2540. self.assertSaltTrueReturn(ret)
  2541. # Second run of state ensures file is in correct state
  2542. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2543. self.assertSaltTrueReturn(ret)
  2544. try:
  2545. os.remove(name)
  2546. except OSError:
  2547. pass
  2548. @pytest.mark.skip_if_not_root
  2549. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2550. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2551. @with_system_user_and_group(
  2552. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2553. )
  2554. @with_tempdir()
  2555. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  2556. def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
  2557. """
  2558. Ensure that mode is correct with changing of ownership and group
  2559. symlinks)
  2560. """
  2561. tempfile = os.path.join(tempdir, "temp_file_issue_48336")
  2562. # Run the state
  2563. ret = self.run_state(
  2564. "file.managed", name=tempfile, user=user, group=group, mode="4750",
  2565. )
  2566. self.assertSaltTrueReturn(ret)
  2567. # Check that the owner and group are correct, and
  2568. # the mode is what we expect
  2569. temp_file_stats = os.stat(tempfile)
  2570. # Normalize the mode
  2571. temp_file_mode = str(oct(stat.S_IMODE(temp_file_stats.st_mode)))
  2572. temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
  2573. self.assertEqual(temp_file_mode, "4750")
  2574. self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
  2575. self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
  2576. @with_tempdir()
  2577. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2578. def test_issue_48557(self, tempdir):
  2579. tempfile = os.path.join(tempdir, "temp_file_issue_48557")
  2580. with salt.utils.files.fopen(tempfile, "wb") as fp:
  2581. fp.write(os.linesep.join(["test1", "test2", "test3", ""]).encode("utf-8"))
  2582. ret = self.run_state(
  2583. "file.line", name=tempfile, after="test2", mode="insert", content="test4"
  2584. )
  2585. self.assertSaltTrueReturn(ret)
  2586. with salt.utils.files.fopen(tempfile, "rb") as fp:
  2587. content = fp.read()
  2588. self.assertEqual(
  2589. content,
  2590. os.linesep.join(["test1", "test2", "test4", "test3", ""]).encode("utf-8"),
  2591. )
  2592. @with_tempfile()
  2593. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2594. def test_issue_50221(self, name):
  2595. expected = "abc{0}{0}{0}".format(os.linesep)
  2596. ret = self.run_function("pillar.get", ["issue-50221"])
  2597. assert ret == expected
  2598. ret = self.run_function("state.apply", ["issue-50221"], pillar={"name": name},)
  2599. self.assertSaltTrueReturn(ret)
  2600. with salt.utils.files.fopen(name, "r") as fp:
  2601. contents = fp.read()
  2602. assert contents == expected
  2603. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2604. def test_managed_file_issue_51208(self):
  2605. """
  2606. Test to ensure we can handle a file with escaped double-quotes
  2607. """
  2608. name = os.path.join(RUNTIME_VARS.TMP, "issue_51208.txt")
  2609. ret = self.run_state(
  2610. "file.managed", name=name, source="salt://issue-51208/vimrc.stub"
  2611. )
  2612. src = os.path.join(RUNTIME_VARS.BASE_FILES, "issue-51208", "vimrc.stub")
  2613. with salt.utils.files.fopen(src, "r") as fp_:
  2614. master_data = fp_.read()
  2615. with salt.utils.files.fopen(name, "r") as fp_:
  2616. minion_data = fp_.read()
  2617. self.assertEqual(master_data, minion_data)
  2618. self.assertSaltTrueReturn(ret)
  2619. @with_tempfile()
  2620. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  2621. def test_keyvalue(self, name):
  2622. """
  2623. file.keyvalue
  2624. """
  2625. content = dedent(
  2626. """\
  2627. # This is the sshd server system-wide configuration file. See
  2628. # sshd_config(5) for more information.
  2629. # The strategy used for options in the default sshd_config shipped with
  2630. # OpenSSH is to specify options with their default value where
  2631. # possible, but leave them commented. Uncommented options override the
  2632. # default value.
  2633. #Port 22
  2634. #AddressFamily any
  2635. #ListenAddress 0.0.0.0
  2636. #ListenAddress ::
  2637. #HostKey /etc/ssh/ssh_host_rsa_key
  2638. #HostKey /etc/ssh/ssh_host_ecdsa_key
  2639. #HostKey /etc/ssh/ssh_host_ed25519_key
  2640. # Ciphers and keying
  2641. #RekeyLimit default none
  2642. # Logging
  2643. #SyslogFacility AUTH
  2644. #LogLevel INFO
  2645. # Authentication:
  2646. #LoginGraceTime 2m
  2647. #PermitRootLogin prohibit-password
  2648. #StrictModes yes
  2649. #MaxAuthTries 6
  2650. #MaxSessions 10
  2651. """
  2652. )
  2653. with salt.utils.files.fopen(name, "w+") as fp_:
  2654. fp_.write(content)
  2655. ret = self.run_state(
  2656. "file.keyvalue",
  2657. name=name,
  2658. key="permitrootlogin",
  2659. value="no",
  2660. separator=" ",
  2661. uncomment=" #",
  2662. key_ignore_case=True,
  2663. )
  2664. with salt.utils.files.fopen(name, "r") as fp_:
  2665. file_contents = fp_.read()
  2666. self.assertNotIn("#PermitRootLogin", file_contents)
  2667. self.assertNotIn("prohibit-password", file_contents)
  2668. self.assertIn("PermitRootLogin no", file_contents)
  2669. self.assertSaltTrueReturn(ret)
  2670. @pytest.mark.windows_whitelisted
  2671. class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
  2672. marker_start = "# start"
  2673. marker_end = "# end"
  2674. content = dedent(
  2675. """\
  2676. Line 1 of block
  2677. Line 2 of block
  2678. """
  2679. )
  2680. without_block = dedent(
  2681. """\
  2682. Hello world!
  2683. # comment here
  2684. """
  2685. )
  2686. with_non_matching_block = dedent(
  2687. """\
  2688. Hello world!
  2689. # start
  2690. No match here
  2691. # end
  2692. # comment here
  2693. """
  2694. )
  2695. with_non_matching_block_and_marker_end_not_after_newline = dedent(
  2696. """\
  2697. Hello world!
  2698. # start
  2699. No match here# end
  2700. # comment here
  2701. """
  2702. )
  2703. with_matching_block = dedent(
  2704. """\
  2705. Hello world!
  2706. # start
  2707. Line 1 of block
  2708. Line 2 of block
  2709. # end
  2710. # comment here
  2711. """
  2712. )
  2713. with_matching_block_and_extra_newline = dedent(
  2714. """\
  2715. Hello world!
  2716. # start
  2717. Line 1 of block
  2718. Line 2 of block
  2719. # end
  2720. # comment here
  2721. """
  2722. )
  2723. with_matching_block_and_marker_end_not_after_newline = dedent(
  2724. """\
  2725. Hello world!
  2726. # start
  2727. Line 1 of block
  2728. Line 2 of block# end
  2729. # comment here
  2730. """
  2731. )
  2732. content_explicit_posix_newlines = "Line 1 of block\n" "Line 2 of block\n"
  2733. content_explicit_windows_newlines = "Line 1 of block\r\n" "Line 2 of block\r\n"
  2734. without_block_explicit_posix_newlines = "Hello world!\n\n" "# comment here\n"
  2735. without_block_explicit_windows_newlines = (
  2736. "Hello world!\r\n\r\n" "# comment here\r\n"
  2737. )
  2738. with_block_prepended_explicit_posix_newlines = (
  2739. "# start\n"
  2740. "Line 1 of block\n"
  2741. "Line 2 of block\n"
  2742. "# end\n"
  2743. "Hello world!\n\n"
  2744. "# comment here\n"
  2745. )
  2746. with_block_prepended_explicit_windows_newlines = (
  2747. "# start\r\n"
  2748. "Line 1 of block\r\n"
  2749. "Line 2 of block\r\n"
  2750. "# end\r\n"
  2751. "Hello world!\r\n\r\n"
  2752. "# comment here\r\n"
  2753. )
  2754. with_block_appended_explicit_posix_newlines = (
  2755. "Hello world!\n\n"
  2756. "# comment here\n"
  2757. "# start\n"
  2758. "Line 1 of block\n"
  2759. "Line 2 of block\n"
  2760. "# end\n"
  2761. )
  2762. with_block_appended_explicit_windows_newlines = (
  2763. "Hello world!\r\n\r\n"
  2764. "# comment here\r\n"
  2765. "# start\r\n"
  2766. "Line 1 of block\r\n"
  2767. "Line 2 of block\r\n"
  2768. "# end\r\n"
  2769. )
  2770. @staticmethod
  2771. def _write(dest, content):
  2772. with salt.utils.files.fopen(dest, "wb") as fp_:
  2773. fp_.write(salt.utils.stringutils.to_bytes(content))
  2774. @staticmethod
  2775. def _read(src):
  2776. with salt.utils.files.fopen(src, "rb") as fp_:
  2777. return salt.utils.stringutils.to_unicode(fp_.read())
  2778. @with_tempfile()
  2779. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2780. def test_prepend(self, name):
  2781. """
  2782. Test blockreplace when prepend_if_not_found=True and block doesn't
  2783. exist in file.
  2784. """
  2785. expected = (
  2786. self.marker_start
  2787. + os.linesep
  2788. + self.content
  2789. + self.marker_end
  2790. + os.linesep
  2791. + self.without_block
  2792. )
  2793. # Pass 1: content ends in newline
  2794. self._write(name, self.without_block)
  2795. ret = self.run_state(
  2796. "file.blockreplace",
  2797. name=name,
  2798. content=self.content,
  2799. marker_start=self.marker_start,
  2800. marker_end=self.marker_end,
  2801. prepend_if_not_found=True,
  2802. )
  2803. self.assertSaltTrueReturn(ret)
  2804. self.assertTrue(ret[next(iter(ret))]["changes"])
  2805. self.assertEqual(self._read(name), expected)
  2806. # Pass 1a: Re-run state, no changes should be made
  2807. ret = self.run_state(
  2808. "file.blockreplace",
  2809. name=name,
  2810. content=self.content,
  2811. marker_start=self.marker_start,
  2812. marker_end=self.marker_end,
  2813. prepend_if_not_found=True,
  2814. )
  2815. self.assertSaltTrueReturn(ret)
  2816. self.assertFalse(ret[next(iter(ret))]["changes"])
  2817. self.assertEqual(self._read(name), expected)
  2818. # Pass 2: content does not end in newline
  2819. self._write(name, self.without_block)
  2820. ret = self.run_state(
  2821. "file.blockreplace",
  2822. name=name,
  2823. content=self.content.rstrip("\r\n"),
  2824. marker_start=self.marker_start,
  2825. marker_end=self.marker_end,
  2826. prepend_if_not_found=True,
  2827. )
  2828. self.assertSaltTrueReturn(ret)
  2829. self.assertTrue(ret[next(iter(ret))]["changes"])
  2830. self.assertEqual(self._read(name), expected)
  2831. # Pass 2a: Re-run state, no changes should be made
  2832. ret = self.run_state(
  2833. "file.blockreplace",
  2834. name=name,
  2835. content=self.content.rstrip("\r\n"),
  2836. marker_start=self.marker_start,
  2837. marker_end=self.marker_end,
  2838. prepend_if_not_found=True,
  2839. )
  2840. self.assertSaltTrueReturn(ret)
  2841. self.assertFalse(ret[next(iter(ret))]["changes"])
  2842. self.assertEqual(self._read(name), expected)
  2843. @with_tempfile()
  2844. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2845. def test_prepend_append_newline(self, name):
  2846. """
  2847. Test blockreplace when prepend_if_not_found=True and block doesn't
  2848. exist in file. Test with append_newline explicitly set to True.
  2849. """
  2850. # Pass 1: content ends in newline
  2851. expected = (
  2852. self.marker_start
  2853. + os.linesep
  2854. + self.content
  2855. + os.linesep
  2856. + self.marker_end
  2857. + os.linesep
  2858. + self.without_block
  2859. )
  2860. self._write(name, self.without_block)
  2861. ret = self.run_state(
  2862. "file.blockreplace",
  2863. name=name,
  2864. content=self.content,
  2865. marker_start=self.marker_start,
  2866. marker_end=self.marker_end,
  2867. prepend_if_not_found=True,
  2868. append_newline=True,
  2869. )
  2870. self.assertSaltTrueReturn(ret)
  2871. self.assertTrue(ret[next(iter(ret))]["changes"])
  2872. self.assertEqual(self._read(name), expected)
  2873. # Pass 1a: Re-run state, no changes should be made
  2874. ret = self.run_state(
  2875. "file.blockreplace",
  2876. name=name,
  2877. content=self.content,
  2878. marker_start=self.marker_start,
  2879. marker_end=self.marker_end,
  2880. prepend_if_not_found=True,
  2881. append_newline=True,
  2882. )
  2883. self.assertSaltTrueReturn(ret)
  2884. self.assertFalse(ret[next(iter(ret))]["changes"])
  2885. self.assertEqual(self._read(name), expected)
  2886. # Pass 2: content does not end in newline
  2887. expected = (
  2888. self.marker_start
  2889. + os.linesep
  2890. + self.content
  2891. + self.marker_end
  2892. + os.linesep
  2893. + self.without_block
  2894. )
  2895. self._write(name, self.without_block)
  2896. ret = self.run_state(
  2897. "file.blockreplace",
  2898. name=name,
  2899. content=self.content.rstrip("\r\n"),
  2900. marker_start=self.marker_start,
  2901. marker_end=self.marker_end,
  2902. prepend_if_not_found=True,
  2903. append_newline=True,
  2904. )
  2905. self.assertSaltTrueReturn(ret)
  2906. self.assertTrue(ret[next(iter(ret))]["changes"])
  2907. self.assertEqual(self._read(name), expected)
  2908. # Pass 2a: Re-run state, no changes should be made
  2909. ret = self.run_state(
  2910. "file.blockreplace",
  2911. name=name,
  2912. content=self.content.rstrip("\r\n"),
  2913. marker_start=self.marker_start,
  2914. marker_end=self.marker_end,
  2915. prepend_if_not_found=True,
  2916. append_newline=True,
  2917. )
  2918. self.assertSaltTrueReturn(ret)
  2919. self.assertFalse(ret[next(iter(ret))]["changes"])
  2920. self.assertEqual(self._read(name), expected)
  2921. @with_tempfile()
  2922. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  2923. def test_prepend_no_append_newline(self, name):
  2924. """
  2925. Test blockreplace when prepend_if_not_found=True and block doesn't
  2926. exist in file. Test with append_newline explicitly set to False.
  2927. """
  2928. # Pass 1: content ends in newline
  2929. expected = (
  2930. self.marker_start
  2931. + os.linesep
  2932. + self.content
  2933. + self.marker_end
  2934. + os.linesep
  2935. + self.without_block
  2936. )
  2937. self._write(name, self.without_block)
  2938. ret = self.run_state(
  2939. "file.blockreplace",
  2940. name=name,
  2941. content=self.content,
  2942. marker_start=self.marker_start,
  2943. marker_end=self.marker_end,
  2944. prepend_if_not_found=True,
  2945. append_newline=False,
  2946. )
  2947. self.assertSaltTrueReturn(ret)
  2948. self.assertTrue(ret[next(iter(ret))]["changes"])
  2949. self.assertEqual(self._read(name), expected)
  2950. # Pass 1a: Re-run state, no changes should be made
  2951. ret = self.run_state(
  2952. "file.blockreplace",
  2953. name=name,
  2954. content=self.content,
  2955. marker_start=self.marker_start,
  2956. marker_end=self.marker_end,
  2957. prepend_if_not_found=True,
  2958. append_newline=False,
  2959. )
  2960. self.assertSaltTrueReturn(ret)
  2961. self.assertFalse(ret[next(iter(ret))]["changes"])
  2962. self.assertEqual(self._read(name), expected)
  2963. # Pass 2: content does not end in newline
  2964. expected = (
  2965. self.marker_start
  2966. + os.linesep
  2967. + self.content.rstrip("\r\n")
  2968. + self.marker_end
  2969. + os.linesep
  2970. + self.without_block
  2971. )
  2972. self._write(name, self.without_block)
  2973. ret = self.run_state(
  2974. "file.blockreplace",
  2975. name=name,
  2976. content=self.content.rstrip("\r\n"),
  2977. marker_start=self.marker_start,
  2978. marker_end=self.marker_end,
  2979. prepend_if_not_found=True,
  2980. append_newline=False,
  2981. )
  2982. self.assertSaltTrueReturn(ret)
  2983. self.assertTrue(ret[next(iter(ret))]["changes"])
  2984. self.assertEqual(self._read(name), expected)
  2985. # Pass 2a: Re-run state, no changes should be made
  2986. ret = self.run_state(
  2987. "file.blockreplace",
  2988. name=name,
  2989. content=self.content.rstrip("\r\n"),
  2990. marker_start=self.marker_start,
  2991. marker_end=self.marker_end,
  2992. prepend_if_not_found=True,
  2993. append_newline=False,
  2994. )
  2995. self.assertSaltTrueReturn(ret)
  2996. self.assertFalse(ret[next(iter(ret))]["changes"])
  2997. self.assertEqual(self._read(name), expected)
  2998. @with_tempfile()
  2999. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3000. def test_append(self, name):
  3001. """
  3002. Test blockreplace when append_if_not_found=True and block doesn't
  3003. exist in file.
  3004. """
  3005. expected = (
  3006. self.without_block
  3007. + self.marker_start
  3008. + os.linesep
  3009. + self.content
  3010. + self.marker_end
  3011. + os.linesep
  3012. )
  3013. # Pass 1: content ends in newline
  3014. self._write(name, self.without_block)
  3015. ret = self.run_state(
  3016. "file.blockreplace",
  3017. name=name,
  3018. content=self.content,
  3019. marker_start=self.marker_start,
  3020. marker_end=self.marker_end,
  3021. append_if_not_found=True,
  3022. )
  3023. self.assertSaltTrueReturn(ret)
  3024. self.assertTrue(ret[next(iter(ret))]["changes"])
  3025. self.assertEqual(self._read(name), expected)
  3026. # Pass 1a: Re-run state, no changes should be made
  3027. ret = self.run_state(
  3028. "file.blockreplace",
  3029. name=name,
  3030. content=self.content,
  3031. marker_start=self.marker_start,
  3032. marker_end=self.marker_end,
  3033. append_if_not_found=True,
  3034. )
  3035. self.assertSaltTrueReturn(ret)
  3036. self.assertFalse(ret[next(iter(ret))]["changes"])
  3037. self.assertEqual(self._read(name), expected)
  3038. # Pass 2: content does not end in newline
  3039. self._write(name, self.without_block)
  3040. ret = self.run_state(
  3041. "file.blockreplace",
  3042. name=name,
  3043. content=self.content.rstrip("\r\n"),
  3044. marker_start=self.marker_start,
  3045. marker_end=self.marker_end,
  3046. append_if_not_found=True,
  3047. )
  3048. self.assertSaltTrueReturn(ret)
  3049. self.assertTrue(ret[next(iter(ret))]["changes"])
  3050. self.assertEqual(self._read(name), expected)
  3051. # Pass 2a: Re-run state, no changes should be made
  3052. ret = self.run_state(
  3053. "file.blockreplace",
  3054. name=name,
  3055. content=self.content.rstrip("\r\n"),
  3056. marker_start=self.marker_start,
  3057. marker_end=self.marker_end,
  3058. append_if_not_found=True,
  3059. )
  3060. self.assertSaltTrueReturn(ret)
  3061. self.assertFalse(ret[next(iter(ret))]["changes"])
  3062. self.assertEqual(self._read(name), expected)
  3063. @with_tempfile()
  3064. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3065. def test_append_append_newline(self, name):
  3066. """
  3067. Test blockreplace when append_if_not_found=True and block doesn't
  3068. exist in file. Test with append_newline explicitly set to True.
  3069. """
  3070. # Pass 1: content ends in newline
  3071. expected = (
  3072. self.without_block
  3073. + self.marker_start
  3074. + os.linesep
  3075. + self.content
  3076. + os.linesep
  3077. + self.marker_end
  3078. + os.linesep
  3079. )
  3080. self._write(name, self.without_block)
  3081. ret = self.run_state(
  3082. "file.blockreplace",
  3083. name=name,
  3084. content=self.content,
  3085. marker_start=self.marker_start,
  3086. marker_end=self.marker_end,
  3087. append_if_not_found=True,
  3088. append_newline=True,
  3089. )
  3090. self.assertSaltTrueReturn(ret)
  3091. self.assertTrue(ret[next(iter(ret))]["changes"])
  3092. self.assertEqual(self._read(name), expected)
  3093. # Pass 1a: Re-run state, no changes should be made
  3094. ret = self.run_state(
  3095. "file.blockreplace",
  3096. name=name,
  3097. content=self.content,
  3098. marker_start=self.marker_start,
  3099. marker_end=self.marker_end,
  3100. append_if_not_found=True,
  3101. append_newline=True,
  3102. )
  3103. self.assertSaltTrueReturn(ret)
  3104. self.assertFalse(ret[next(iter(ret))]["changes"])
  3105. self.assertEqual(self._read(name), expected)
  3106. # Pass 2: content does not end in newline
  3107. expected = (
  3108. self.without_block
  3109. + self.marker_start
  3110. + os.linesep
  3111. + self.content
  3112. + self.marker_end
  3113. + os.linesep
  3114. )
  3115. self._write(name, self.without_block)
  3116. ret = self.run_state(
  3117. "file.blockreplace",
  3118. name=name,
  3119. content=self.content.rstrip("\r\n"),
  3120. marker_start=self.marker_start,
  3121. marker_end=self.marker_end,
  3122. append_if_not_found=True,
  3123. append_newline=True,
  3124. )
  3125. self.assertSaltTrueReturn(ret)
  3126. self.assertTrue(ret[next(iter(ret))]["changes"])
  3127. self.assertEqual(self._read(name), expected)
  3128. # Pass 2a: Re-run state, no changes should be made
  3129. ret = self.run_state(
  3130. "file.blockreplace",
  3131. name=name,
  3132. content=self.content.rstrip("\r\n"),
  3133. marker_start=self.marker_start,
  3134. marker_end=self.marker_end,
  3135. append_if_not_found=True,
  3136. append_newline=True,
  3137. )
  3138. self.assertSaltTrueReturn(ret)
  3139. self.assertFalse(ret[next(iter(ret))]["changes"])
  3140. self.assertEqual(self._read(name), expected)
  3141. @with_tempfile()
  3142. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3143. def test_append_no_append_newline(self, name):
  3144. """
  3145. Test blockreplace when append_if_not_found=True and block doesn't
  3146. exist in file. Test with append_newline explicitly set to False.
  3147. """
  3148. # Pass 1: content ends in newline
  3149. expected = (
  3150. self.without_block
  3151. + self.marker_start
  3152. + os.linesep
  3153. + self.content
  3154. + self.marker_end
  3155. + os.linesep
  3156. )
  3157. self._write(name, self.without_block)
  3158. ret = self.run_state(
  3159. "file.blockreplace",
  3160. name=name,
  3161. content=self.content,
  3162. marker_start=self.marker_start,
  3163. marker_end=self.marker_end,
  3164. append_if_not_found=True,
  3165. append_newline=False,
  3166. )
  3167. self.assertSaltTrueReturn(ret)
  3168. self.assertTrue(ret[next(iter(ret))]["changes"])
  3169. self.assertEqual(self._read(name), expected)
  3170. # Pass 1a: Re-run state, no changes should be made
  3171. ret = self.run_state(
  3172. "file.blockreplace",
  3173. name=name,
  3174. content=self.content,
  3175. marker_start=self.marker_start,
  3176. marker_end=self.marker_end,
  3177. append_if_not_found=True,
  3178. append_newline=False,
  3179. )
  3180. self.assertSaltTrueReturn(ret)
  3181. self.assertFalse(ret[next(iter(ret))]["changes"])
  3182. self.assertEqual(self._read(name), expected)
  3183. # Pass 2: content does not end in newline
  3184. expected = (
  3185. self.without_block
  3186. + self.marker_start
  3187. + os.linesep
  3188. + self.content.rstrip("\r\n")
  3189. + self.marker_end
  3190. + os.linesep
  3191. )
  3192. self._write(name, self.without_block)
  3193. ret = self.run_state(
  3194. "file.blockreplace",
  3195. name=name,
  3196. content=self.content.rstrip("\r\n"),
  3197. marker_start=self.marker_start,
  3198. marker_end=self.marker_end,
  3199. append_if_not_found=True,
  3200. append_newline=False,
  3201. )
  3202. self.assertSaltTrueReturn(ret)
  3203. self.assertTrue(ret[next(iter(ret))]["changes"])
  3204. self.assertEqual(self._read(name), expected)
  3205. # Pass 2a: Re-run state, no changes should be made
  3206. ret = self.run_state(
  3207. "file.blockreplace",
  3208. name=name,
  3209. content=self.content.rstrip("\r\n"),
  3210. marker_start=self.marker_start,
  3211. marker_end=self.marker_end,
  3212. append_if_not_found=True,
  3213. append_newline=False,
  3214. )
  3215. self.assertSaltTrueReturn(ret)
  3216. self.assertFalse(ret[next(iter(ret))]["changes"])
  3217. self.assertEqual(self._read(name), expected)
  3218. @with_tempfile()
  3219. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3220. def test_prepend_auto_line_separator(self, name):
  3221. """
  3222. This tests the line separator auto-detection when prepending the block
  3223. """
  3224. # POSIX newlines to Windows newlines
  3225. self._write(name, self.without_block_explicit_windows_newlines)
  3226. ret = self.run_state(
  3227. "file.blockreplace",
  3228. name=name,
  3229. content=self.content_explicit_posix_newlines,
  3230. marker_start=self.marker_start,
  3231. marker_end=self.marker_end,
  3232. prepend_if_not_found=True,
  3233. )
  3234. self.assertSaltTrueReturn(ret)
  3235. self.assertTrue(ret[next(iter(ret))]["changes"])
  3236. self.assertEqual(
  3237. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3238. )
  3239. # Re-run state, no changes should be made
  3240. ret = self.run_state(
  3241. "file.blockreplace",
  3242. name=name,
  3243. content=self.content_explicit_posix_newlines,
  3244. marker_start=self.marker_start,
  3245. marker_end=self.marker_end,
  3246. prepend_if_not_found=True,
  3247. )
  3248. self.assertSaltTrueReturn(ret)
  3249. self.assertFalse(ret[next(iter(ret))]["changes"])
  3250. self.assertEqual(
  3251. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3252. )
  3253. # Windows newlines to POSIX newlines
  3254. self._write(name, self.without_block_explicit_posix_newlines)
  3255. ret = self.run_state(
  3256. "file.blockreplace",
  3257. name=name,
  3258. content=self.content_explicit_windows_newlines,
  3259. marker_start=self.marker_start,
  3260. marker_end=self.marker_end,
  3261. prepend_if_not_found=True,
  3262. )
  3263. self.assertSaltTrueReturn(ret)
  3264. self.assertTrue(ret[next(iter(ret))]["changes"])
  3265. self.assertEqual(
  3266. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3267. )
  3268. # Re-run state, no changes should be made
  3269. ret = self.run_state(
  3270. "file.blockreplace",
  3271. name=name,
  3272. content=self.content_explicit_windows_newlines,
  3273. marker_start=self.marker_start,
  3274. marker_end=self.marker_end,
  3275. prepend_if_not_found=True,
  3276. )
  3277. self.assertSaltTrueReturn(ret)
  3278. self.assertFalse(ret[next(iter(ret))]["changes"])
  3279. self.assertEqual(
  3280. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3281. )
  3282. @with_tempfile()
  3283. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3284. def test_append_auto_line_separator(self, name):
  3285. """
  3286. This tests the line separator auto-detection when appending the block
  3287. """
  3288. # POSIX newlines to Windows newlines
  3289. self._write(name, self.without_block_explicit_windows_newlines)
  3290. ret = self.run_state(
  3291. "file.blockreplace",
  3292. name=name,
  3293. content=self.content_explicit_posix_newlines,
  3294. marker_start=self.marker_start,
  3295. marker_end=self.marker_end,
  3296. append_if_not_found=True,
  3297. )
  3298. self.assertSaltTrueReturn(ret)
  3299. self.assertTrue(ret[next(iter(ret))]["changes"])
  3300. self.assertEqual(
  3301. self._read(name), self.with_block_appended_explicit_windows_newlines
  3302. )
  3303. # Re-run state, no changes should be made
  3304. ret = self.run_state(
  3305. "file.blockreplace",
  3306. name=name,
  3307. content=self.content_explicit_posix_newlines,
  3308. marker_start=self.marker_start,
  3309. marker_end=self.marker_end,
  3310. append_if_not_found=True,
  3311. )
  3312. self.assertSaltTrueReturn(ret)
  3313. self.assertFalse(ret[next(iter(ret))]["changes"])
  3314. self.assertEqual(
  3315. self._read(name), self.with_block_appended_explicit_windows_newlines
  3316. )
  3317. # Windows newlines to POSIX newlines
  3318. self._write(name, self.without_block_explicit_posix_newlines)
  3319. ret = self.run_state(
  3320. "file.blockreplace",
  3321. name=name,
  3322. content=self.content_explicit_windows_newlines,
  3323. marker_start=self.marker_start,
  3324. marker_end=self.marker_end,
  3325. append_if_not_found=True,
  3326. )
  3327. self.assertSaltTrueReturn(ret)
  3328. self.assertTrue(ret[next(iter(ret))]["changes"])
  3329. self.assertEqual(
  3330. self._read(name), self.with_block_appended_explicit_posix_newlines
  3331. )
  3332. # Re-run state, no changes should be made
  3333. ret = self.run_state(
  3334. "file.blockreplace",
  3335. name=name,
  3336. content=self.content_explicit_windows_newlines,
  3337. marker_start=self.marker_start,
  3338. marker_end=self.marker_end,
  3339. append_if_not_found=True,
  3340. )
  3341. self.assertSaltTrueReturn(ret)
  3342. self.assertFalse(ret[next(iter(ret))]["changes"])
  3343. self.assertEqual(
  3344. self._read(name), self.with_block_appended_explicit_posix_newlines
  3345. )
  3346. @with_tempfile()
  3347. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3348. def test_non_matching_block(self, name):
  3349. """
  3350. Test blockreplace when block exists but its contents are not a
  3351. match.
  3352. """
  3353. # Pass 1: content ends in newline
  3354. self._write(name, self.with_non_matching_block)
  3355. ret = self.run_state(
  3356. "file.blockreplace",
  3357. name=name,
  3358. content=self.content,
  3359. marker_start=self.marker_start,
  3360. marker_end=self.marker_end,
  3361. )
  3362. self.assertSaltTrueReturn(ret)
  3363. self.assertTrue(ret[next(iter(ret))]["changes"])
  3364. self.assertEqual(self._read(name), self.with_matching_block)
  3365. # Pass 1a: Re-run state, no changes should be made
  3366. ret = self.run_state(
  3367. "file.blockreplace",
  3368. name=name,
  3369. content=self.content,
  3370. marker_start=self.marker_start,
  3371. marker_end=self.marker_end,
  3372. )
  3373. self.assertSaltTrueReturn(ret)
  3374. self.assertFalse(ret[next(iter(ret))]["changes"])
  3375. self.assertEqual(self._read(name), self.with_matching_block)
  3376. # Pass 2: content does not end in newline
  3377. self._write(name, self.with_non_matching_block)
  3378. ret = self.run_state(
  3379. "file.blockreplace",
  3380. name=name,
  3381. content=self.content.rstrip("\r\n"),
  3382. marker_start=self.marker_start,
  3383. marker_end=self.marker_end,
  3384. )
  3385. self.assertSaltTrueReturn(ret)
  3386. self.assertTrue(ret[next(iter(ret))]["changes"])
  3387. self.assertEqual(self._read(name), self.with_matching_block)
  3388. # Pass 2a: Re-run state, no changes should be made
  3389. ret = self.run_state(
  3390. "file.blockreplace",
  3391. name=name,
  3392. content=self.content.rstrip("\r\n"),
  3393. marker_start=self.marker_start,
  3394. marker_end=self.marker_end,
  3395. )
  3396. self.assertSaltTrueReturn(ret)
  3397. self.assertFalse(ret[next(iter(ret))]["changes"])
  3398. self.assertEqual(self._read(name), self.with_matching_block)
  3399. @with_tempfile()
  3400. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3401. def test_non_matching_block_append_newline(self, name):
  3402. """
  3403. Test blockreplace when block exists but its contents are not a
  3404. match. Test with append_newline explicitly set to True.
  3405. """
  3406. # Pass 1: content ends in newline
  3407. self._write(name, self.with_non_matching_block)
  3408. ret = self.run_state(
  3409. "file.blockreplace",
  3410. name=name,
  3411. content=self.content,
  3412. marker_start=self.marker_start,
  3413. marker_end=self.marker_end,
  3414. append_newline=True,
  3415. )
  3416. self.assertSaltTrueReturn(ret)
  3417. self.assertTrue(ret[next(iter(ret))]["changes"])
  3418. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3419. # Pass 1a: Re-run state, no changes should be made
  3420. ret = self.run_state(
  3421. "file.blockreplace",
  3422. name=name,
  3423. content=self.content,
  3424. marker_start=self.marker_start,
  3425. marker_end=self.marker_end,
  3426. append_newline=True,
  3427. )
  3428. self.assertSaltTrueReturn(ret)
  3429. self.assertFalse(ret[next(iter(ret))]["changes"])
  3430. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3431. # Pass 2: content does not end in newline
  3432. self._write(name, self.with_non_matching_block)
  3433. ret = self.run_state(
  3434. "file.blockreplace",
  3435. name=name,
  3436. content=self.content.rstrip("\r\n"),
  3437. marker_start=self.marker_start,
  3438. marker_end=self.marker_end,
  3439. append_newline=True,
  3440. )
  3441. self.assertSaltTrueReturn(ret)
  3442. self.assertTrue(ret[next(iter(ret))]["changes"])
  3443. self.assertEqual(self._read(name), self.with_matching_block)
  3444. # Pass 2a: Re-run state, no changes should be made
  3445. ret = self.run_state(
  3446. "file.blockreplace",
  3447. name=name,
  3448. content=self.content.rstrip("\r\n"),
  3449. marker_start=self.marker_start,
  3450. marker_end=self.marker_end,
  3451. append_newline=True,
  3452. )
  3453. self.assertSaltTrueReturn(ret)
  3454. self.assertFalse(ret[next(iter(ret))]["changes"])
  3455. self.assertEqual(self._read(name), self.with_matching_block)
  3456. @with_tempfile()
  3457. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3458. def test_non_matching_block_no_append_newline(self, name):
  3459. """
  3460. Test blockreplace when block exists but its contents are not a
  3461. match. Test with append_newline explicitly set to False.
  3462. """
  3463. # Pass 1: content ends in newline
  3464. self._write(name, self.with_non_matching_block)
  3465. ret = self.run_state(
  3466. "file.blockreplace",
  3467. name=name,
  3468. content=self.content,
  3469. marker_start=self.marker_start,
  3470. marker_end=self.marker_end,
  3471. append_newline=False,
  3472. )
  3473. self.assertSaltTrueReturn(ret)
  3474. self.assertTrue(ret[next(iter(ret))]["changes"])
  3475. self.assertEqual(self._read(name), self.with_matching_block)
  3476. # Pass 1a: Re-run state, no changes should be made
  3477. ret = self.run_state(
  3478. "file.blockreplace",
  3479. name=name,
  3480. content=self.content,
  3481. marker_start=self.marker_start,
  3482. marker_end=self.marker_end,
  3483. append_newline=False,
  3484. )
  3485. self.assertSaltTrueReturn(ret)
  3486. self.assertFalse(ret[next(iter(ret))]["changes"])
  3487. self.assertEqual(self._read(name), self.with_matching_block)
  3488. # Pass 2: content does not end in newline
  3489. self._write(name, self.with_non_matching_block)
  3490. ret = self.run_state(
  3491. "file.blockreplace",
  3492. name=name,
  3493. content=self.content.rstrip("\r\n"),
  3494. marker_start=self.marker_start,
  3495. marker_end=self.marker_end,
  3496. append_newline=False,
  3497. )
  3498. self.assertSaltTrueReturn(ret)
  3499. self.assertTrue(ret[next(iter(ret))]["changes"])
  3500. self.assertEqual(
  3501. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3502. )
  3503. # Pass 2a: Re-run state, no changes should be made
  3504. ret = self.run_state(
  3505. "file.blockreplace",
  3506. name=name,
  3507. content=self.content.rstrip("\r\n"),
  3508. marker_start=self.marker_start,
  3509. marker_end=self.marker_end,
  3510. append_newline=False,
  3511. )
  3512. self.assertSaltTrueReturn(ret)
  3513. self.assertFalse(ret[next(iter(ret))]["changes"])
  3514. self.assertEqual(
  3515. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3516. )
  3517. @with_tempfile()
  3518. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3519. def test_non_matching_block_and_marker_not_after_newline(self, name):
  3520. """
  3521. Test blockreplace when block exists but its contents are not a
  3522. match, and the marker_end is not directly preceded by a newline.
  3523. """
  3524. # Pass 1: content ends in newline
  3525. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3526. ret = self.run_state(
  3527. "file.blockreplace",
  3528. name=name,
  3529. content=self.content,
  3530. marker_start=self.marker_start,
  3531. marker_end=self.marker_end,
  3532. )
  3533. self.assertSaltTrueReturn(ret)
  3534. self.assertTrue(ret[next(iter(ret))]["changes"])
  3535. self.assertEqual(self._read(name), self.with_matching_block)
  3536. # Pass 1a: Re-run state, no changes should be made
  3537. ret = self.run_state(
  3538. "file.blockreplace",
  3539. name=name,
  3540. content=self.content,
  3541. marker_start=self.marker_start,
  3542. marker_end=self.marker_end,
  3543. )
  3544. self.assertSaltTrueReturn(ret)
  3545. self.assertFalse(ret[next(iter(ret))]["changes"])
  3546. self.assertEqual(self._read(name), self.with_matching_block)
  3547. # Pass 2: content does not end in newline
  3548. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3549. ret = self.run_state(
  3550. "file.blockreplace",
  3551. name=name,
  3552. content=self.content.rstrip("\r\n"),
  3553. marker_start=self.marker_start,
  3554. marker_end=self.marker_end,
  3555. )
  3556. self.assertSaltTrueReturn(ret)
  3557. self.assertTrue(ret[next(iter(ret))]["changes"])
  3558. self.assertEqual(self._read(name), self.with_matching_block)
  3559. # Pass 2a: Re-run state, no changes should be made
  3560. ret = self.run_state(
  3561. "file.blockreplace",
  3562. name=name,
  3563. content=self.content.rstrip("\r\n"),
  3564. marker_start=self.marker_start,
  3565. marker_end=self.marker_end,
  3566. )
  3567. self.assertSaltTrueReturn(ret)
  3568. self.assertFalse(ret[next(iter(ret))]["changes"])
  3569. self.assertEqual(self._read(name), self.with_matching_block)
  3570. @with_tempfile()
  3571. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3572. def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3573. """
  3574. Test blockreplace when block exists but its contents are not a match,
  3575. and the marker_end is not directly preceded by a newline. Test with
  3576. append_newline explicitly set to True.
  3577. """
  3578. # Pass 1: content ends in newline
  3579. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3580. ret = self.run_state(
  3581. "file.blockreplace",
  3582. name=name,
  3583. content=self.content,
  3584. marker_start=self.marker_start,
  3585. marker_end=self.marker_end,
  3586. append_newline=True,
  3587. )
  3588. self.assertSaltTrueReturn(ret)
  3589. self.assertTrue(ret[next(iter(ret))]["changes"])
  3590. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3591. # Pass 1a: Re-run state, no changes should be made
  3592. ret = self.run_state(
  3593. "file.blockreplace",
  3594. name=name,
  3595. content=self.content,
  3596. marker_start=self.marker_start,
  3597. marker_end=self.marker_end,
  3598. append_newline=True,
  3599. )
  3600. self.assertSaltTrueReturn(ret)
  3601. self.assertFalse(ret[next(iter(ret))]["changes"])
  3602. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3603. # Pass 2: content does not end in newline
  3604. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3605. ret = self.run_state(
  3606. "file.blockreplace",
  3607. name=name,
  3608. content=self.content.rstrip("\r\n"),
  3609. marker_start=self.marker_start,
  3610. marker_end=self.marker_end,
  3611. append_newline=True,
  3612. )
  3613. self.assertSaltTrueReturn(ret)
  3614. self.assertTrue(ret[next(iter(ret))]["changes"])
  3615. self.assertEqual(self._read(name), self.with_matching_block)
  3616. # Pass 2a: Re-run state, no changes should be made
  3617. ret = self.run_state(
  3618. "file.blockreplace",
  3619. name=name,
  3620. content=self.content.rstrip("\r\n"),
  3621. marker_start=self.marker_start,
  3622. marker_end=self.marker_end,
  3623. append_newline=True,
  3624. )
  3625. self.assertSaltTrueReturn(ret)
  3626. self.assertFalse(ret[next(iter(ret))]["changes"])
  3627. self.assertEqual(self._read(name), self.with_matching_block)
  3628. @with_tempfile()
  3629. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3630. def test_non_matching_block_and_marker_not_after_newline_no_append_newline(
  3631. self, name
  3632. ):
  3633. """
  3634. Test blockreplace when block exists but its contents are not a match,
  3635. and the marker_end is not directly preceded by a newline. Test with
  3636. append_newline explicitly set to False.
  3637. """
  3638. # Pass 1: content ends in newline
  3639. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3640. ret = self.run_state(
  3641. "file.blockreplace",
  3642. name=name,
  3643. content=self.content,
  3644. marker_start=self.marker_start,
  3645. marker_end=self.marker_end,
  3646. append_newline=False,
  3647. )
  3648. self.assertSaltTrueReturn(ret)
  3649. self.assertTrue(ret[next(iter(ret))]["changes"])
  3650. self.assertEqual(self._read(name), self.with_matching_block)
  3651. # Pass 1a: Re-run state, no changes should be made
  3652. ret = self.run_state(
  3653. "file.blockreplace",
  3654. name=name,
  3655. content=self.content,
  3656. marker_start=self.marker_start,
  3657. marker_end=self.marker_end,
  3658. append_newline=False,
  3659. )
  3660. self.assertSaltTrueReturn(ret)
  3661. self.assertFalse(ret[next(iter(ret))]["changes"])
  3662. self.assertEqual(self._read(name), self.with_matching_block)
  3663. # Pass 2: content does not end in newline
  3664. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3665. ret = self.run_state(
  3666. "file.blockreplace",
  3667. name=name,
  3668. content=self.content.rstrip("\r\n"),
  3669. marker_start=self.marker_start,
  3670. marker_end=self.marker_end,
  3671. append_newline=False,
  3672. )
  3673. self.assertSaltTrueReturn(ret)
  3674. self.assertTrue(ret[next(iter(ret))]["changes"])
  3675. self.assertEqual(
  3676. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3677. )
  3678. # Pass 2a: Re-run state, no changes should be made
  3679. ret = self.run_state(
  3680. "file.blockreplace",
  3681. name=name,
  3682. content=self.content.rstrip("\r\n"),
  3683. marker_start=self.marker_start,
  3684. marker_end=self.marker_end,
  3685. append_newline=False,
  3686. )
  3687. self.assertSaltTrueReturn(ret)
  3688. self.assertFalse(ret[next(iter(ret))]["changes"])
  3689. self.assertEqual(
  3690. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3691. )
  3692. @with_tempfile()
  3693. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3694. def test_matching_block(self, name):
  3695. """
  3696. Test blockreplace when block exists and its contents are a match. No
  3697. changes should be made.
  3698. """
  3699. # Pass 1: content ends in newline
  3700. self._write(name, self.with_matching_block)
  3701. ret = self.run_state(
  3702. "file.blockreplace",
  3703. name=name,
  3704. content=self.content,
  3705. marker_start=self.marker_start,
  3706. marker_end=self.marker_end,
  3707. )
  3708. self.assertSaltTrueReturn(ret)
  3709. self.assertFalse(ret[next(iter(ret))]["changes"])
  3710. self.assertEqual(self._read(name), self.with_matching_block)
  3711. # Pass 1a: Re-run state, no changes should be made
  3712. ret = self.run_state(
  3713. "file.blockreplace",
  3714. name=name,
  3715. content=self.content,
  3716. marker_start=self.marker_start,
  3717. marker_end=self.marker_end,
  3718. )
  3719. self.assertSaltTrueReturn(ret)
  3720. self.assertFalse(ret[next(iter(ret))]["changes"])
  3721. self.assertEqual(self._read(name), self.with_matching_block)
  3722. # Pass 2: content does not end in newline
  3723. self._write(name, self.with_matching_block)
  3724. ret = self.run_state(
  3725. "file.blockreplace",
  3726. name=name,
  3727. content=self.content.rstrip("\r\n"),
  3728. marker_start=self.marker_start,
  3729. marker_end=self.marker_end,
  3730. )
  3731. self.assertSaltTrueReturn(ret)
  3732. self.assertFalse(ret[next(iter(ret))]["changes"])
  3733. self.assertEqual(self._read(name), self.with_matching_block)
  3734. # Pass 2a: Re-run state, no changes should be made
  3735. ret = self.run_state(
  3736. "file.blockreplace",
  3737. name=name,
  3738. content=self.content.rstrip("\r\n"),
  3739. marker_start=self.marker_start,
  3740. marker_end=self.marker_end,
  3741. )
  3742. self.assertSaltTrueReturn(ret)
  3743. self.assertFalse(ret[next(iter(ret))]["changes"])
  3744. self.assertEqual(self._read(name), self.with_matching_block)
  3745. @with_tempfile()
  3746. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3747. def test_matching_block_append_newline(self, name):
  3748. """
  3749. Test blockreplace when block exists and its contents are a match. Test
  3750. with append_newline explicitly set to True. This will result in an
  3751. extra newline when the content ends in a newline, and will not when the
  3752. content does not end in a newline.
  3753. """
  3754. # Pass 1: content ends in newline
  3755. self._write(name, self.with_matching_block)
  3756. ret = self.run_state(
  3757. "file.blockreplace",
  3758. name=name,
  3759. content=self.content,
  3760. marker_start=self.marker_start,
  3761. marker_end=self.marker_end,
  3762. append_newline=True,
  3763. )
  3764. self.assertSaltTrueReturn(ret)
  3765. self.assertTrue(ret[next(iter(ret))]["changes"])
  3766. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3767. # Pass 1a: Re-run state, no changes should be made
  3768. ret = self.run_state(
  3769. "file.blockreplace",
  3770. name=name,
  3771. content=self.content,
  3772. marker_start=self.marker_start,
  3773. marker_end=self.marker_end,
  3774. append_newline=True,
  3775. )
  3776. self.assertSaltTrueReturn(ret)
  3777. self.assertFalse(ret[next(iter(ret))]["changes"])
  3778. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3779. # Pass 2: content does not end in newline
  3780. self._write(name, self.with_matching_block)
  3781. ret = self.run_state(
  3782. "file.blockreplace",
  3783. name=name,
  3784. content=self.content.rstrip("\r\n"),
  3785. marker_start=self.marker_start,
  3786. marker_end=self.marker_end,
  3787. append_newline=True,
  3788. )
  3789. self.assertSaltTrueReturn(ret)
  3790. self.assertFalse(ret[next(iter(ret))]["changes"])
  3791. self.assertEqual(self._read(name), self.with_matching_block)
  3792. # Pass 2a: Re-run state, no changes should be made
  3793. ret = self.run_state(
  3794. "file.blockreplace",
  3795. name=name,
  3796. content=self.content.rstrip("\r\n"),
  3797. marker_start=self.marker_start,
  3798. marker_end=self.marker_end,
  3799. append_newline=True,
  3800. )
  3801. self.assertSaltTrueReturn(ret)
  3802. self.assertFalse(ret[next(iter(ret))]["changes"])
  3803. self.assertEqual(self._read(name), self.with_matching_block)
  3804. @with_tempfile()
  3805. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3806. def test_matching_block_no_append_newline(self, name):
  3807. """
  3808. Test blockreplace when block exists and its contents are a match. Test
  3809. with append_newline explicitly set to False. This will result in the
  3810. marker_end not being directly preceded by a newline when the content
  3811. does not end in a newline.
  3812. """
  3813. # Pass 1: content ends in newline
  3814. self._write(name, self.with_matching_block)
  3815. ret = self.run_state(
  3816. "file.blockreplace",
  3817. name=name,
  3818. content=self.content,
  3819. marker_start=self.marker_start,
  3820. marker_end=self.marker_end,
  3821. append_newline=False,
  3822. )
  3823. self.assertSaltTrueReturn(ret)
  3824. self.assertFalse(ret[next(iter(ret))]["changes"])
  3825. self.assertEqual(self._read(name), self.with_matching_block)
  3826. # Pass 1a: Re-run state, no changes should be made
  3827. ret = self.run_state(
  3828. "file.blockreplace",
  3829. name=name,
  3830. content=self.content,
  3831. marker_start=self.marker_start,
  3832. marker_end=self.marker_end,
  3833. append_newline=False,
  3834. )
  3835. self.assertSaltTrueReturn(ret)
  3836. self.assertFalse(ret[next(iter(ret))]["changes"])
  3837. self.assertEqual(self._read(name), self.with_matching_block)
  3838. # Pass 2: content does not end in newline
  3839. self._write(name, self.with_matching_block)
  3840. ret = self.run_state(
  3841. "file.blockreplace",
  3842. name=name,
  3843. content=self.content.rstrip("\r\n"),
  3844. marker_start=self.marker_start,
  3845. marker_end=self.marker_end,
  3846. append_newline=False,
  3847. )
  3848. self.assertSaltTrueReturn(ret)
  3849. self.assertTrue(ret[next(iter(ret))]["changes"])
  3850. self.assertEqual(
  3851. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3852. )
  3853. # Pass 2a: Re-run state, no changes should be made
  3854. ret = self.run_state(
  3855. "file.blockreplace",
  3856. name=name,
  3857. content=self.content.rstrip("\r\n"),
  3858. marker_start=self.marker_start,
  3859. marker_end=self.marker_end,
  3860. append_newline=False,
  3861. )
  3862. self.assertSaltTrueReturn(ret)
  3863. self.assertFalse(ret[next(iter(ret))]["changes"])
  3864. self.assertEqual(
  3865. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3866. )
  3867. @with_tempfile()
  3868. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3869. def test_matching_block_and_marker_not_after_newline(self, name):
  3870. """
  3871. Test blockreplace when block exists and its contents are a match, but
  3872. the marker_end is not directly preceded by a newline.
  3873. """
  3874. # Pass 1: content ends in newline
  3875. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3876. ret = self.run_state(
  3877. "file.blockreplace",
  3878. name=name,
  3879. content=self.content,
  3880. marker_start=self.marker_start,
  3881. marker_end=self.marker_end,
  3882. )
  3883. self.assertSaltTrueReturn(ret)
  3884. self.assertTrue(ret[next(iter(ret))]["changes"])
  3885. self.assertEqual(self._read(name), self.with_matching_block)
  3886. # Pass 1a: Re-run state, no changes should be made
  3887. ret = self.run_state(
  3888. "file.blockreplace",
  3889. name=name,
  3890. content=self.content,
  3891. marker_start=self.marker_start,
  3892. marker_end=self.marker_end,
  3893. )
  3894. self.assertSaltTrueReturn(ret)
  3895. self.assertFalse(ret[next(iter(ret))]["changes"])
  3896. self.assertEqual(self._read(name), self.with_matching_block)
  3897. # Pass 2: content does not end in newline
  3898. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3899. ret = self.run_state(
  3900. "file.blockreplace",
  3901. name=name,
  3902. content=self.content.rstrip("\r\n"),
  3903. marker_start=self.marker_start,
  3904. marker_end=self.marker_end,
  3905. )
  3906. self.assertSaltTrueReturn(ret)
  3907. self.assertTrue(ret[next(iter(ret))]["changes"])
  3908. self.assertEqual(self._read(name), self.with_matching_block)
  3909. # Pass 2a: Re-run state, no changes should be made
  3910. ret = self.run_state(
  3911. "file.blockreplace",
  3912. name=name,
  3913. content=self.content.rstrip("\r\n"),
  3914. marker_start=self.marker_start,
  3915. marker_end=self.marker_end,
  3916. )
  3917. self.assertSaltTrueReturn(ret)
  3918. self.assertFalse(ret[next(iter(ret))]["changes"])
  3919. self.assertEqual(self._read(name), self.with_matching_block)
  3920. @with_tempfile()
  3921. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3922. def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3923. """
  3924. Test blockreplace when block exists and its contents are a match, but
  3925. the marker_end is not directly preceded by a newline. Test with
  3926. append_newline explicitly set to True. This will result in an extra
  3927. newline when the content ends in a newline, and will not when the
  3928. content does not end in a newline.
  3929. """
  3930. # Pass 1: content ends in newline
  3931. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3932. ret = self.run_state(
  3933. "file.blockreplace",
  3934. name=name,
  3935. content=self.content,
  3936. marker_start=self.marker_start,
  3937. marker_end=self.marker_end,
  3938. append_newline=True,
  3939. )
  3940. self.assertSaltTrueReturn(ret)
  3941. self.assertTrue(ret[next(iter(ret))]["changes"])
  3942. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3943. # Pass 1a: Re-run state, no changes should be made
  3944. ret = self.run_state(
  3945. "file.blockreplace",
  3946. name=name,
  3947. content=self.content,
  3948. marker_start=self.marker_start,
  3949. marker_end=self.marker_end,
  3950. append_newline=True,
  3951. )
  3952. self.assertSaltTrueReturn(ret)
  3953. self.assertFalse(ret[next(iter(ret))]["changes"])
  3954. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3955. # Pass 2: content does not end in newline
  3956. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3957. ret = self.run_state(
  3958. "file.blockreplace",
  3959. name=name,
  3960. content=self.content.rstrip("\r\n"),
  3961. marker_start=self.marker_start,
  3962. marker_end=self.marker_end,
  3963. append_newline=True,
  3964. )
  3965. self.assertSaltTrueReturn(ret)
  3966. self.assertTrue(ret[next(iter(ret))]["changes"])
  3967. self.assertEqual(self._read(name), self.with_matching_block)
  3968. # Pass 2a: Re-run state, no changes should be made
  3969. ret = self.run_state(
  3970. "file.blockreplace",
  3971. name=name,
  3972. content=self.content.rstrip("\r\n"),
  3973. marker_start=self.marker_start,
  3974. marker_end=self.marker_end,
  3975. append_newline=True,
  3976. )
  3977. self.assertSaltTrueReturn(ret)
  3978. self.assertFalse(ret[next(iter(ret))]["changes"])
  3979. self.assertEqual(self._read(name), self.with_matching_block)
  3980. @with_tempfile()
  3981. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  3982. def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
  3983. """
  3984. Test blockreplace when block exists and its contents are a match, but
  3985. the marker_end is not directly preceded by a newline. Test with
  3986. append_newline explicitly set to False.
  3987. """
  3988. # Pass 1: content ends in newline
  3989. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3990. ret = self.run_state(
  3991. "file.blockreplace",
  3992. name=name,
  3993. content=self.content,
  3994. marker_start=self.marker_start,
  3995. marker_end=self.marker_end,
  3996. append_newline=False,
  3997. )
  3998. self.assertSaltTrueReturn(ret)
  3999. self.assertTrue(ret[next(iter(ret))]["changes"])
  4000. self.assertEqual(self._read(name), self.with_matching_block)
  4001. # Pass 1a: Re-run state, no changes should be made
  4002. ret = self.run_state(
  4003. "file.blockreplace",
  4004. name=name,
  4005. content=self.content,
  4006. marker_start=self.marker_start,
  4007. marker_end=self.marker_end,
  4008. append_newline=False,
  4009. )
  4010. self.assertSaltTrueReturn(ret)
  4011. self.assertFalse(ret[next(iter(ret))]["changes"])
  4012. self.assertEqual(self._read(name), self.with_matching_block)
  4013. # Pass 2: content does not end in newline
  4014. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  4015. ret = self.run_state(
  4016. "file.blockreplace",
  4017. name=name,
  4018. content=self.content.rstrip("\r\n"),
  4019. marker_start=self.marker_start,
  4020. marker_end=self.marker_end,
  4021. append_newline=False,
  4022. )
  4023. self.assertSaltTrueReturn(ret)
  4024. self.assertFalse(ret[next(iter(ret))]["changes"])
  4025. self.assertEqual(
  4026. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  4027. )
  4028. # Pass 2a: Re-run state, no changes should be made
  4029. ret = self.run_state(
  4030. "file.blockreplace",
  4031. name=name,
  4032. content=self.content.rstrip("\r\n"),
  4033. marker_start=self.marker_start,
  4034. marker_end=self.marker_end,
  4035. append_newline=False,
  4036. )
  4037. self.assertSaltTrueReturn(ret)
  4038. self.assertFalse(ret[next(iter(ret))]["changes"])
  4039. self.assertEqual(
  4040. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  4041. )
  4042. @with_tempfile()
  4043. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  4044. def test_issue_49043(self, name):
  4045. ret = self.run_function("state.sls", mods="issue-49043", pillar={"name": name},)
  4046. log.error("ret = %s", repr(ret))
  4047. diff = "--- \n+++ \n@@ -0,0 +1,3 @@\n"
  4048. diff += dedent(
  4049. """\
  4050. +#-- start managed zone --
  4051. +äöü
  4052. +#-- end managed zone --
  4053. """
  4054. )
  4055. job = "file_|-somefile-blockreplace_|-{}_|-blockreplace".format(name)
  4056. self.assertEqual(ret[job]["changes"]["diff"], diff)
  4057. @pytest.mark.windows_whitelisted
  4058. class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
  4059. """
  4060. Uses a local tornado webserver to test http(s) file.managed states with and
  4061. without skip_verify
  4062. """
  4063. @classmethod
  4064. def setUpClass(cls):
  4065. cls.webserver = Webserver()
  4066. cls.webserver.start()
  4067. cls.source = cls.webserver.url("grail/scene33")
  4068. if IS_WINDOWS:
  4069. # CRLF vs LF causes a different hash on windows
  4070. cls.source_hash = "21438b3d5fd2c0028bcab92f7824dc69"
  4071. else:
  4072. cls.source_hash = "d2feb3beb323c79fc7a0f44f1408b4a3"
  4073. @classmethod
  4074. def tearDownClass(cls):
  4075. cls.webserver.stop()
  4076. @with_tempfile(create=False)
  4077. def setUp(self, name): # pylint: disable=arguments-differ
  4078. self.name = name
  4079. def tearDown(self):
  4080. try:
  4081. os.remove(self.name)
  4082. except OSError as exc:
  4083. if exc.errno != errno.ENOENT:
  4084. six.reraise(*sys.exc_info())
  4085. def run_state(self, *args, **kwargs): # pylint: disable=arguments-differ
  4086. ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
  4087. log.debug("ret = %s", ret)
  4088. return ret
  4089. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  4090. def test_file_managed_http_source_no_hash(self):
  4091. """
  4092. Test a remote file with no hash
  4093. """
  4094. ret = self.run_state(
  4095. "file.managed", name=self.name, source=self.source, skip_verify=False
  4096. )
  4097. # This should fail because no hash was provided
  4098. self.assertSaltFalseReturn(ret)
  4099. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  4100. def test_file_managed_http_source(self):
  4101. """
  4102. Test a remote file with no hash
  4103. """
  4104. ret = self.run_state(
  4105. "file.managed",
  4106. name=self.name,
  4107. source=self.source,
  4108. source_hash=self.source_hash,
  4109. skip_verify=False,
  4110. )
  4111. self.assertSaltTrueReturn(ret)
  4112. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  4113. def test_file_managed_http_source_skip_verify(self):
  4114. """
  4115. Test a remote file using skip_verify
  4116. """
  4117. ret = self.run_state(
  4118. "file.managed", name=self.name, source=self.source, skip_verify=True
  4119. )
  4120. self.assertSaltTrueReturn(ret)
  4121. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  4122. def test_file_managed_keep_source_false_http(self):
  4123. """
  4124. This test ensures that we properly clean the cached file if keep_source
  4125. is set to False, for source files using an http:// URL
  4126. """
  4127. # Run the state
  4128. ret = self.run_state(
  4129. "file.managed",
  4130. name=self.name,
  4131. source=self.source,
  4132. source_hash=self.source_hash,
  4133. keep_source=False,
  4134. )
  4135. ret = ret[next(iter(ret))]
  4136. assert ret["result"] is True
  4137. # Now make sure that the file is not cached
  4138. result = self.run_function("cp.is_cached", [self.source])
  4139. assert result == "", "File is still cached at {0}".format(result)
  4140. @skipIf(not salt.utils.path.which("patch"), "patch is not installed")
  4141. @pytest.mark.windows_whitelisted
  4142. class PatchTest(ModuleCase, SaltReturnAssertsMixin):
  4143. def _check_patch_version(self, min_version):
  4144. """
  4145. patch version check
  4146. """
  4147. version = self.run_function("cmd.run", ["patch --version"]).splitlines()[0]
  4148. version = version.split()[1]
  4149. if _LooseVersion(version) < _LooseVersion(min_version):
  4150. self.skipTest(
  4151. "Minimum patch version required: {0}. "
  4152. "Patch version installed: {1}".format(min_version, version)
  4153. )
  4154. @classmethod
  4155. def setUpClass(cls):
  4156. cls.webserver = Webserver()
  4157. cls.webserver.start()
  4158. cls.numbers_patch_name = "numbers.patch"
  4159. cls.math_patch_name = "math.patch"
  4160. cls.all_patch_name = "all.patch"
  4161. cls.numbers_patch_template_name = cls.numbers_patch_name + ".jinja"
  4162. cls.math_patch_template_name = cls.math_patch_name + ".jinja"
  4163. cls.all_patch_template_name = cls.all_patch_name + ".jinja"
  4164. cls.numbers_patch_path = "patches/" + cls.numbers_patch_name
  4165. cls.math_patch_path = "patches/" + cls.math_patch_name
  4166. cls.all_patch_path = "patches/" + cls.all_patch_name
  4167. cls.numbers_patch_template_path = "patches/" + cls.numbers_patch_template_name
  4168. cls.math_patch_template_path = "patches/" + cls.math_patch_template_name
  4169. cls.all_patch_template_path = "patches/" + cls.all_patch_template_name
  4170. cls.numbers_patch = "salt://" + cls.numbers_patch_path
  4171. cls.math_patch = "salt://" + cls.math_patch_path
  4172. cls.all_patch = "salt://" + cls.all_patch_path
  4173. cls.numbers_patch_template = "salt://" + cls.numbers_patch_template_path
  4174. cls.math_patch_template = "salt://" + cls.math_patch_template_path
  4175. cls.all_patch_template = "salt://" + cls.all_patch_template_path
  4176. cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
  4177. cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
  4178. cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
  4179. cls.numbers_patch_template_http = cls.webserver.url(
  4180. cls.numbers_patch_template_path
  4181. )
  4182. cls.math_patch_template_http = cls.webserver.url(cls.math_patch_template_path)
  4183. cls.all_patch_template_http = cls.webserver.url(cls.all_patch_template_path)
  4184. patches_dir = os.path.join(RUNTIME_VARS.FILES, "file", "base", "patches")
  4185. cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
  4186. os.path.join(patches_dir, cls.numbers_patch_name)
  4187. )
  4188. cls.math_patch_hash = salt.utils.hashutils.get_hash(
  4189. os.path.join(patches_dir, cls.math_patch_name)
  4190. )
  4191. cls.all_patch_hash = salt.utils.hashutils.get_hash(
  4192. os.path.join(patches_dir, cls.all_patch_name)
  4193. )
  4194. cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
  4195. os.path.join(patches_dir, cls.numbers_patch_template_name)
  4196. )
  4197. cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
  4198. os.path.join(patches_dir, cls.math_patch_template_name)
  4199. )
  4200. cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
  4201. os.path.join(patches_dir, cls.all_patch_template_name)
  4202. )
  4203. cls.context = {"two": "two", "ten": 10}
  4204. @classmethod
  4205. def tearDownClass(cls):
  4206. cls.webserver.stop()
  4207. def setUp(self):
  4208. """
  4209. Create a new unpatched set of files
  4210. """
  4211. self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  4212. os.makedirs(os.path.join(self.base_dir, "foo", "bar"))
  4213. self.numbers_file = os.path.join(self.base_dir, "foo", "numbers.txt")
  4214. self.math_file = os.path.join(self.base_dir, "foo", "bar", "math.txt")
  4215. with salt.utils.files.fopen(self.numbers_file, "w") as fp_:
  4216. fp_.write(
  4217. textwrap.dedent(
  4218. """\
  4219. one
  4220. two
  4221. three
  4222. 1
  4223. 2
  4224. 3
  4225. """
  4226. )
  4227. )
  4228. with salt.utils.files.fopen(self.math_file, "w") as fp_:
  4229. fp_.write(
  4230. textwrap.dedent(
  4231. """\
  4232. Five plus five is ten
  4233. Four squared is sixteen
  4234. """
  4235. )
  4236. )
  4237. self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
  4238. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4239. def test_patch_single_file(self):
  4240. """
  4241. Test file.patch using a patch applied to a single file
  4242. """
  4243. ret = self.run_state(
  4244. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4245. )
  4246. self.assertSaltTrueReturn(ret)
  4247. ret = ret[next(iter(ret))]
  4248. self.assertEqual(ret["comment"], "Patch successfully applied")
  4249. # Re-run the state, should succeed and there should be a message about
  4250. # a partially-applied hunk.
  4251. ret = self.run_state(
  4252. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4253. )
  4254. self.assertSaltTrueReturn(ret)
  4255. ret = ret[next(iter(ret))]
  4256. self.assertEqual(ret["comment"], "Patch was already applied")
  4257. self.assertEqual(ret["changes"], {})
  4258. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4259. def test_patch_directory(self):
  4260. """
  4261. Test file.patch using a patch applied to a directory, with changes
  4262. spanning multiple files.
  4263. """
  4264. self._check_patch_version("2.6")
  4265. ret = self.run_state(
  4266. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4267. )
  4268. self.assertSaltTrueReturn(ret)
  4269. ret = ret[next(iter(ret))]
  4270. self.assertEqual(ret["comment"], "Patch successfully applied")
  4271. # Re-run the state, should succeed and there should be a message about
  4272. # a partially-applied hunk.
  4273. ret = self.run_state(
  4274. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4275. )
  4276. self.assertSaltTrueReturn(ret)
  4277. ret = ret[next(iter(ret))]
  4278. self.assertEqual(ret["comment"], "Patch was already applied")
  4279. self.assertEqual(ret["changes"], {})
  4280. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  4281. def test_patch_strip_parsing(self):
  4282. """
  4283. Test that we successfuly parse -p/--strip when included in the options
  4284. """
  4285. self._check_patch_version("2.6")
  4286. # Run the state using -p1
  4287. ret = self.run_state(
  4288. "file.patch", name=self.base_dir, source=self.all_patch, options="-p1",
  4289. )
  4290. self.assertSaltTrueReturn(ret)
  4291. ret = ret[next(iter(ret))]
  4292. self.assertEqual(ret["comment"], "Patch successfully applied")
  4293. # Re-run the state using --strip=1
  4294. ret = self.run_state(
  4295. "file.patch",
  4296. name=self.base_dir,
  4297. source=self.all_patch,
  4298. options="--strip=1",
  4299. )
  4300. self.assertSaltTrueReturn(ret)
  4301. ret = ret[next(iter(ret))]
  4302. self.assertEqual(ret["comment"], "Patch was already applied")
  4303. self.assertEqual(ret["changes"], {})
  4304. # Re-run the state using --strip 1
  4305. ret = self.run_state(
  4306. "file.patch",
  4307. name=self.base_dir,
  4308. source=self.all_patch,
  4309. options="--strip 1",
  4310. )
  4311. self.assertSaltTrueReturn(ret)
  4312. ret = ret[next(iter(ret))]
  4313. self.assertEqual(ret["comment"], "Patch was already applied")
  4314. self.assertEqual(ret["changes"], {})
  4315. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4316. def test_patch_saltenv(self):
  4317. """
  4318. Test that we attempt to download the patch from a non-base saltenv
  4319. """
  4320. # This state will fail because we don't have a patch file in that
  4321. # environment, but that is OK, we just want to test that we're looking
  4322. # in an environment other than base.
  4323. ret = self.run_state(
  4324. "file.patch", name=self.math_file, source=self.math_patch, saltenv="prod",
  4325. )
  4326. self.assertSaltFalseReturn(ret)
  4327. ret = ret[next(iter(ret))]
  4328. self.assertEqual(
  4329. ret["comment"],
  4330. "Source file {0} not found in saltenv 'prod'".format(self.math_patch),
  4331. )
  4332. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4333. def test_patch_single_file_failure(self):
  4334. """
  4335. Test file.patch using a patch applied to a single file. This tests a
  4336. failed patch.
  4337. """
  4338. # Empty the file to ensure that the patch doesn't apply cleanly
  4339. with salt.utils.files.fopen(self.numbers_file, "w"):
  4340. pass
  4341. ret = self.run_state(
  4342. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4343. )
  4344. self.assertSaltFalseReturn(ret)
  4345. ret = ret[next(iter(ret))]
  4346. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4347. # Test the reject_file option and ensure that the rejects are written
  4348. # to the path specified.
  4349. reject_file = salt.utils.files.mkstemp()
  4350. ret = self.run_state(
  4351. "file.patch",
  4352. name=self.numbers_file,
  4353. source=self.numbers_patch,
  4354. reject_file=reject_file,
  4355. strip=1,
  4356. )
  4357. self.assertSaltFalseReturn(ret)
  4358. ret = ret[next(iter(ret))]
  4359. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4360. self.assertRegex(
  4361. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4362. )
  4363. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4364. def test_patch_directory_failure(self):
  4365. """
  4366. Test file.patch using a patch applied to a directory, with changes
  4367. spanning multiple files.
  4368. """
  4369. # Empty the file to ensure that the patch doesn't apply
  4370. with salt.utils.files.fopen(self.math_file, "w"):
  4371. pass
  4372. ret = self.run_state(
  4373. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4374. )
  4375. self.assertSaltFalseReturn(ret)
  4376. ret = ret[next(iter(ret))]
  4377. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4378. # Test the reject_file option and ensure that the rejects are written
  4379. # to the path specified.
  4380. reject_file = salt.utils.files.mkstemp()
  4381. ret = self.run_state(
  4382. "file.patch",
  4383. name=self.base_dir,
  4384. source=self.all_patch,
  4385. reject_file=reject_file,
  4386. strip=1,
  4387. )
  4388. self.assertSaltFalseReturn(ret)
  4389. ret = ret[next(iter(ret))]
  4390. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4391. self.assertRegex(
  4392. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4393. )
  4394. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4395. def test_patch_single_file_remote_source(self):
  4396. """
  4397. Test file.patch using a patch applied to a single file, with the patch
  4398. coming from a remote source.
  4399. """
  4400. # Try without a source_hash and without skip_verify=True, this should
  4401. # fail with a message about the source_hash
  4402. ret = self.run_state(
  4403. "file.patch", name=self.math_file, source=self.math_patch_http,
  4404. )
  4405. self.assertSaltFalseReturn(ret)
  4406. ret = ret[next(iter(ret))]
  4407. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4408. # Re-run the state with a source hash, it should now succeed
  4409. ret = self.run_state(
  4410. "file.patch",
  4411. name=self.math_file,
  4412. source=self.math_patch_http,
  4413. source_hash=self.math_patch_hash,
  4414. )
  4415. self.assertSaltTrueReturn(ret)
  4416. ret = ret[next(iter(ret))]
  4417. self.assertEqual(ret["comment"], "Patch successfully applied")
  4418. # Re-run again, this time with no hash and skip_verify=True to test
  4419. # skipping hash verification
  4420. ret = self.run_state(
  4421. "file.patch",
  4422. name=self.math_file,
  4423. source=self.math_patch_http,
  4424. skip_verify=True,
  4425. )
  4426. self.assertSaltTrueReturn(ret)
  4427. ret = ret[next(iter(ret))]
  4428. self.assertEqual(ret["comment"], "Patch was already applied")
  4429. self.assertEqual(ret["changes"], {})
  4430. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4431. def test_patch_directory_remote_source(self):
  4432. """
  4433. Test file.patch using a patch applied to a directory, with changes
  4434. spanning multiple files, and the patch file coming from a remote
  4435. source.
  4436. """
  4437. self._check_patch_version("2.6")
  4438. # Try without a source_hash and without skip_verify=True, this should
  4439. # fail with a message about the source_hash
  4440. ret = self.run_state(
  4441. "file.patch", name=self.base_dir, source=self.all_patch_http, strip=1,
  4442. )
  4443. self.assertSaltFalseReturn(ret)
  4444. ret = ret[next(iter(ret))]
  4445. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4446. # Re-run the state with a source hash, it should now succeed
  4447. ret = self.run_state(
  4448. "file.patch",
  4449. name=self.base_dir,
  4450. source=self.all_patch_http,
  4451. source_hash=self.all_patch_hash,
  4452. strip=1,
  4453. )
  4454. self.assertSaltTrueReturn(ret)
  4455. ret = ret[next(iter(ret))]
  4456. self.assertEqual(ret["comment"], "Patch successfully applied")
  4457. # Re-run again, this time with no hash and skip_verify=True to test
  4458. # skipping hash verification
  4459. ret = self.run_state(
  4460. "file.patch",
  4461. name=self.base_dir,
  4462. source=self.all_patch_http,
  4463. strip=1,
  4464. skip_verify=True,
  4465. )
  4466. self.assertSaltTrueReturn(ret)
  4467. ret = ret[next(iter(ret))]
  4468. self.assertEqual(ret["comment"], "Patch was already applied")
  4469. self.assertEqual(ret["changes"], {})
  4470. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4471. def test_patch_single_file_template(self):
  4472. """
  4473. Test file.patch using a patch applied to a single file, with jinja
  4474. templating applied to the patch file.
  4475. """
  4476. ret = self.run_state(
  4477. "file.patch",
  4478. name=self.numbers_file,
  4479. source=self.numbers_patch_template,
  4480. template="jinja",
  4481. context=self.context,
  4482. )
  4483. self.assertSaltTrueReturn(ret)
  4484. ret = ret[next(iter(ret))]
  4485. self.assertEqual(ret["comment"], "Patch successfully applied")
  4486. # Re-run the state, should succeed and there should be a message about
  4487. # a partially-applied hunk.
  4488. ret = self.run_state(
  4489. "file.patch",
  4490. name=self.numbers_file,
  4491. source=self.numbers_patch_template,
  4492. template="jinja",
  4493. context=self.context,
  4494. )
  4495. self.assertSaltTrueReturn(ret)
  4496. ret = ret[next(iter(ret))]
  4497. self.assertEqual(ret["comment"], "Patch was already applied")
  4498. self.assertEqual(ret["changes"], {})
  4499. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4500. def test_patch_directory_template(self):
  4501. """
  4502. Test file.patch using a patch applied to a directory, with changes
  4503. spanning multiple files, and with jinja templating applied to the patch
  4504. file.
  4505. """
  4506. self._check_patch_version("2.6")
  4507. ret = self.run_state(
  4508. "file.patch",
  4509. name=self.base_dir,
  4510. source=self.all_patch_template,
  4511. template="jinja",
  4512. context=self.context,
  4513. strip=1,
  4514. )
  4515. self.assertSaltTrueReturn(ret)
  4516. ret = ret[next(iter(ret))]
  4517. self.assertEqual(ret["comment"], "Patch successfully applied")
  4518. # Re-run the state, should succeed and there should be a message about
  4519. # a partially-applied hunk.
  4520. ret = self.run_state(
  4521. "file.patch",
  4522. name=self.base_dir,
  4523. source=self.all_patch_template,
  4524. template="jinja",
  4525. context=self.context,
  4526. strip=1,
  4527. )
  4528. self.assertSaltTrueReturn(ret)
  4529. ret = ret[next(iter(ret))]
  4530. self.assertEqual(ret["comment"], "Patch was already applied")
  4531. self.assertEqual(ret["changes"], {})
  4532. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4533. def test_patch_single_file_remote_source_template(self):
  4534. """
  4535. Test file.patch using a patch applied to a single file, with the patch
  4536. coming from a remote source.
  4537. """
  4538. # Try without a source_hash and without skip_verify=True, this should
  4539. # fail with a message about the source_hash
  4540. ret = self.run_state(
  4541. "file.patch",
  4542. name=self.math_file,
  4543. source=self.math_patch_template_http,
  4544. template="jinja",
  4545. context=self.context,
  4546. )
  4547. self.assertSaltFalseReturn(ret)
  4548. ret = ret[next(iter(ret))]
  4549. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4550. # Re-run the state with a source hash, it should now succeed
  4551. ret = self.run_state(
  4552. "file.patch",
  4553. name=self.math_file,
  4554. source=self.math_patch_template_http,
  4555. source_hash=self.math_patch_template_hash,
  4556. template="jinja",
  4557. context=self.context,
  4558. )
  4559. self.assertSaltTrueReturn(ret)
  4560. ret = ret[next(iter(ret))]
  4561. self.assertEqual(ret["comment"], "Patch successfully applied")
  4562. # Re-run again, this time with no hash and skip_verify=True to test
  4563. # skipping hash verification
  4564. ret = self.run_state(
  4565. "file.patch",
  4566. name=self.math_file,
  4567. source=self.math_patch_template_http,
  4568. template="jinja",
  4569. context=self.context,
  4570. skip_verify=True,
  4571. )
  4572. self.assertSaltTrueReturn(ret)
  4573. ret = ret[next(iter(ret))]
  4574. self.assertEqual(ret["comment"], "Patch was already applied")
  4575. self.assertEqual(ret["changes"], {})
  4576. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  4577. def test_patch_directory_remote_source_template(self):
  4578. """
  4579. Test file.patch using a patch applied to a directory, with changes
  4580. spanning multiple files, and the patch file coming from a remote
  4581. source.
  4582. """
  4583. self._check_patch_version("2.6")
  4584. # Try without a source_hash and without skip_verify=True, this should
  4585. # fail with a message about the source_hash
  4586. ret = self.run_state(
  4587. "file.patch",
  4588. name=self.base_dir,
  4589. source=self.all_patch_template_http,
  4590. template="jinja",
  4591. context=self.context,
  4592. strip=1,
  4593. )
  4594. self.assertSaltFalseReturn(ret)
  4595. ret = ret[next(iter(ret))]
  4596. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4597. # Re-run the state with a source hash, it should now succeed
  4598. ret = self.run_state(
  4599. "file.patch",
  4600. name=self.base_dir,
  4601. source=self.all_patch_template_http,
  4602. source_hash=self.all_patch_template_hash,
  4603. template="jinja",
  4604. context=self.context,
  4605. strip=1,
  4606. )
  4607. self.assertSaltTrueReturn(ret)
  4608. ret = ret[next(iter(ret))]
  4609. self.assertEqual(ret["comment"], "Patch successfully applied")
  4610. # Re-run again, this time with no hash and skip_verify=True to test
  4611. # skipping hash verification
  4612. ret = self.run_state(
  4613. "file.patch",
  4614. name=self.base_dir,
  4615. source=self.all_patch_template_http,
  4616. template="jinja",
  4617. context=self.context,
  4618. strip=1,
  4619. skip_verify=True,
  4620. )
  4621. self.assertSaltTrueReturn(ret)
  4622. ret = ret[next(iter(ret))]
  4623. self.assertEqual(ret["comment"], "Patch was already applied")
  4624. self.assertEqual(ret["changes"], {})
  4625. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  4626. def test_patch_test_mode(self):
  4627. """
  4628. Test file.patch using test=True
  4629. """
  4630. # Try without a source_hash and without skip_verify=True, this should
  4631. # fail with a message about the source_hash
  4632. ret = self.run_state(
  4633. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4634. )
  4635. self.assertSaltNoneReturn(ret)
  4636. ret = ret[next(iter(ret))]
  4637. self.assertEqual(ret["comment"], "The patch would be applied")
  4638. self.assertTrue(ret["changes"])
  4639. # Apply the patch for real. We'll then be able to test below that we
  4640. # exit with a True rather than a None result if test=True is used on an
  4641. # already-applied patch.
  4642. ret = self.run_state(
  4643. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4644. )
  4645. self.assertSaltTrueReturn(ret)
  4646. ret = ret[next(iter(ret))]
  4647. self.assertEqual(ret["comment"], "Patch successfully applied")
  4648. self.assertTrue(ret["changes"])
  4649. # Run again with test=True. Since the pre-check happens before we do
  4650. # the __opts__['test'] check, we should exit with a True result just
  4651. # the same as if we try to run this state on an already-patched file
  4652. # *without* test=True.
  4653. ret = self.run_state(
  4654. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4655. )
  4656. self.assertSaltTrueReturn(ret)
  4657. ret = ret[next(iter(ret))]
  4658. self.assertEqual(ret["comment"], "Patch was already applied")
  4659. self.assertEqual(ret["changes"], {})
  4660. # Empty the file to ensure that the patch doesn't apply cleanly
  4661. with salt.utils.files.fopen(self.numbers_file, "w"):
  4662. pass
  4663. # Run again with test=True. Similar to the above run, we are testing
  4664. # that we return before we reach the __opts__['test'] check. In this
  4665. # case we should return a False result because we should already know
  4666. # by this point that the patch will not apply cleanly.
  4667. ret = self.run_state(
  4668. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4669. )
  4670. self.assertSaltFalseReturn(ret)
  4671. ret = ret[next(iter(ret))]
  4672. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4673. self.assertEqual(ret["changes"], {})
  4674. WIN_TEST_FILE = "c:/testfile"
  4675. @pytest.mark.destructive_test
  4676. @skipIf(not IS_WINDOWS, "windows test only")
  4677. @pytest.mark.windows_whitelisted
  4678. class WinFileTest(ModuleCase):
  4679. """
  4680. Test for the file state on Windows
  4681. """
  4682. def setUp(self):
  4683. self.run_state(
  4684. "file.managed", name=WIN_TEST_FILE, makedirs=True, contents="Only a test"
  4685. )
  4686. def tearDown(self):
  4687. self.run_state("file.absent", name=WIN_TEST_FILE)
  4688. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  4689. def test_file_managed(self):
  4690. """
  4691. Test file.managed on Windows
  4692. """
  4693. self.assertTrue(self.run_state("file.exists", name=WIN_TEST_FILE))
  4694. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  4695. def test_file_copy(self):
  4696. """
  4697. Test file.copy on Windows
  4698. """
  4699. ret = self.run_state(
  4700. "file.copy", name="c:/testfile_copy", makedirs=True, source=WIN_TEST_FILE
  4701. )
  4702. self.assertTrue(ret)
  4703. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  4704. def test_file_comment(self):
  4705. """
  4706. Test file.comment on Windows
  4707. """
  4708. self.run_state("file.comment", name=WIN_TEST_FILE, regex="^Only")
  4709. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4710. self.assertTrue(fp_.read().startswith("#Only"))
  4711. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  4712. def test_file_replace(self):
  4713. """
  4714. Test file.replace on Windows
  4715. """
  4716. self.run_state(
  4717. "file.replace", name=WIN_TEST_FILE, pattern="test", repl="testing"
  4718. )
  4719. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4720. self.assertIn("testing", fp_.read())
  4721. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  4722. def test_file_absent(self):
  4723. """
  4724. Test file.absent on Windows
  4725. """
  4726. ret = self.run_state("file.absent", name=WIN_TEST_FILE)
  4727. self.assertTrue(ret)